mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[Ingest Manager] Use long polling for agent checkin (#68922)
This commit is contained in:
parent
550b95f172
commit
0c477478a4
23 changed files with 534 additions and 353 deletions
|
@ -14,3 +14,5 @@ export const AGENT_TYPE_TEMPORARY = 'TEMPORARY';
|
|||
|
||||
export const AGENT_POLLING_THRESHOLD_MS = 30000;
|
||||
export const AGENT_POLLING_INTERVAL = 1000;
|
||||
export const AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS = 30000;
|
||||
export const AGENT_UPDATE_ACTIONS_INTERVAL_MS = 5000;
|
||||
|
|
|
@ -15,6 +15,7 @@ export interface IngestManagerConfigType {
|
|||
fleet: {
|
||||
enabled: boolean;
|
||||
tlsCheckDisabled: boolean;
|
||||
pollingRequestTimeout: number;
|
||||
kibana: {
|
||||
host?: string;
|
||||
ca_sha256?: string;
|
||||
|
|
|
@ -3,5 +3,8 @@
|
|||
"name": "ingest-manager",
|
||||
"version": "8.0.0",
|
||||
"private": true,
|
||||
"license": "Elastic-License"
|
||||
"license": "Elastic-License",
|
||||
"dependencies": {
|
||||
"abort-controller": "^3.0.0"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,8 @@ export {
|
|||
AGENT_TYPE_TEMPORARY,
|
||||
AGENT_POLLING_THRESHOLD_MS,
|
||||
AGENT_POLLING_INTERVAL,
|
||||
AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS,
|
||||
AGENT_UPDATE_ACTIONS_INTERVAL_MS,
|
||||
INDEX_PATTERN_PLACEHOLDER_SUFFIX,
|
||||
// Routes
|
||||
PLUGIN_ID,
|
||||
|
|
|
@ -27,6 +27,7 @@ export const config = {
|
|||
fleet: schema.object({
|
||||
enabled: schema.boolean({ defaultValue: true }),
|
||||
tlsCheckDisabled: schema.boolean({ defaultValue: false }),
|
||||
pollingRequestTimeout: schema.number({ defaultValue: 60000 }),
|
||||
kibana: schema.object({
|
||||
host: schema.maybe(schema.string()),
|
||||
ca_sha256: schema.maybe(schema.string()),
|
||||
|
|
|
@ -55,6 +55,7 @@ import {
|
|||
} from './services';
|
||||
import { getAgentStatusById } from './services/agents';
|
||||
import { CloudSetup } from '../../cloud/server';
|
||||
import { agentCheckinState } from './services/agents/checkin/state';
|
||||
|
||||
export interface IngestManagerSetupDeps {
|
||||
licensing: LicensingPluginSetup;
|
||||
|
@ -229,6 +230,8 @@ export class IngestManagerPlugin
|
|||
logger: this.logger,
|
||||
});
|
||||
licenseService.start(this.licensing$);
|
||||
agentCheckinState.start();
|
||||
|
||||
return {
|
||||
esIndexPatternService: new ESIndexPatternSavedObjectService(),
|
||||
agentService: {
|
||||
|
@ -240,5 +243,6 @@ export class IngestManagerPlugin
|
|||
public async stop() {
|
||||
appContextService.stop();
|
||||
licenseService.stop();
|
||||
agentCheckinState.stop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,8 +4,9 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { RequestHandler, KibanaRequest } from 'src/core/server';
|
||||
import { RequestHandler } from 'src/core/server';
|
||||
import { TypeOf } from '@kbn/config-schema';
|
||||
import { AbortController } from 'abort-controller';
|
||||
import {
|
||||
GetAgentsResponse,
|
||||
GetOneAgentResponse,
|
||||
|
@ -32,13 +33,6 @@ import * as AgentService from '../../services/agents';
|
|||
import * as APIKeyService from '../../services/api_keys';
|
||||
import { appContextService } from '../../services/app_context';
|
||||
|
||||
export function getInternalUserSOClient(request: KibanaRequest) {
|
||||
// soClient as kibana internal users, be carefull on how you use it, security is not enabled
|
||||
return appContextService.getSavedObjects().getScopedClient(request, {
|
||||
excludedWrappers: ['security'],
|
||||
});
|
||||
}
|
||||
|
||||
export const getAgentHandler: RequestHandler<TypeOf<
|
||||
typeof GetOneAgentRequestSchema.params
|
||||
>> = async (context, request, response) => {
|
||||
|
@ -176,14 +170,20 @@ export const postAgentCheckinHandler: RequestHandler<
|
|||
TypeOf<typeof PostAgentCheckinRequestSchema.body>
|
||||
> = async (context, request, response) => {
|
||||
try {
|
||||
const soClient = getInternalUserSOClient(request);
|
||||
const soClient = appContextService.getInternalUserSOClient(request);
|
||||
const res = APIKeyService.parseApiKeyFromHeaders(request.headers);
|
||||
const agent = await AgentService.getAgentByAccessAPIKeyId(soClient, res.apiKeyId);
|
||||
const abortController = new AbortController();
|
||||
request.events.aborted$.subscribe(() => {
|
||||
abortController.abort();
|
||||
});
|
||||
const signal = abortController.signal;
|
||||
const { actions } = await AgentService.agentCheckin(
|
||||
soClient,
|
||||
agent,
|
||||
request.body.events || [],
|
||||
request.body.local_metadata
|
||||
request.body.local_metadata,
|
||||
{ signal }
|
||||
);
|
||||
const body: PostAgentCheckinResponse = {
|
||||
action: 'checkin',
|
||||
|
@ -198,16 +198,24 @@ export const postAgentCheckinHandler: RequestHandler<
|
|||
};
|
||||
|
||||
return response.ok({ body });
|
||||
} catch (e) {
|
||||
if (e.isBoom && e.output.statusCode === 404) {
|
||||
return response.notFound({
|
||||
body: { message: `Agent ${request.params.agentId} not found` },
|
||||
} catch (err) {
|
||||
const logger = appContextService.getLogger();
|
||||
if (err.isBoom) {
|
||||
if (err.output.statusCode >= 500) {
|
||||
logger.error(err);
|
||||
}
|
||||
|
||||
return response.customError({
|
||||
statusCode: err.output.statusCode,
|
||||
body: { message: err.output.payload.message },
|
||||
});
|
||||
}
|
||||
|
||||
logger.error(err);
|
||||
|
||||
return response.customError({
|
||||
statusCode: 500,
|
||||
body: { message: e.message },
|
||||
body: { message: err.message },
|
||||
});
|
||||
}
|
||||
};
|
||||
|
@ -218,7 +226,7 @@ export const postAgentEnrollHandler: RequestHandler<
|
|||
TypeOf<typeof PostAgentEnrollRequestSchema.body>
|
||||
> = async (context, request, response) => {
|
||||
try {
|
||||
const soClient = getInternalUserSOClient(request);
|
||||
const soClient = appContextService.getInternalUserSOClient(request);
|
||||
const { apiKeyId } = APIKeyService.parseApiKeyFromHeaders(request.headers);
|
||||
const enrollmentAPIKey = await APIKeyService.getEnrollmentAPIKeyById(soClient, apiKeyId);
|
||||
|
||||
|
|
|
@ -35,12 +35,12 @@ import {
|
|||
postAgentEnrollHandler,
|
||||
postAgentsUnenrollHandler,
|
||||
getAgentStatusForConfigHandler,
|
||||
getInternalUserSOClient,
|
||||
putAgentsReassignHandler,
|
||||
} from './handlers';
|
||||
import { postAgentAcksHandlerBuilder } from './acks_handlers';
|
||||
import * as AgentService from '../../services/agents';
|
||||
import { postNewAgentActionHandlerBuilder } from './actions_handlers';
|
||||
import { appContextService } from '../../services';
|
||||
|
||||
export const registerRoutes = (router: IRouter) => {
|
||||
// Get one
|
||||
|
@ -110,7 +110,9 @@ export const registerRoutes = (router: IRouter) => {
|
|||
postAgentAcksHandlerBuilder({
|
||||
acknowledgeAgentActions: AgentService.acknowledgeAgentActions,
|
||||
getAgentByAccessAPIKeyId: AgentService.getAgentByAccessAPIKeyId,
|
||||
getSavedObjectsClientContract: getInternalUserSOClient,
|
||||
getSavedObjectsClientContract: appContextService.getInternalUserSOClient.bind(
|
||||
appContextService
|
||||
),
|
||||
saveAgentEvents: AgentService.saveAgentEvents,
|
||||
})
|
||||
);
|
||||
|
|
|
@ -65,8 +65,6 @@ class AgentConfigService {
|
|||
updated_by: user ? user.username : 'system',
|
||||
});
|
||||
|
||||
await this.triggerAgentConfigUpdatedEvent(soClient, 'updated', id);
|
||||
|
||||
return (await this.get(soClient, id)) as AgentConfig;
|
||||
}
|
||||
|
||||
|
|
|
@ -77,6 +77,15 @@ export async function getAgentActionByIds(
|
|||
);
|
||||
}
|
||||
|
||||
export async function getNewActionsSince(soClient: SavedObjectsClientContract, timestamp: string) {
|
||||
const res = await soClient.find<AgentActionSOAttributes>({
|
||||
type: AGENT_ACTION_SAVED_OBJECT_TYPE,
|
||||
filter: `not ${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.sent_at: * AND ${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.created_at >= "${timestamp}"`,
|
||||
});
|
||||
|
||||
return res.saved_objects.map(savedObjectToAgentAction);
|
||||
}
|
||||
|
||||
export interface ActionsService {
|
||||
getAgent: (soClient: SavedObjectsClientContract, agentId: string) => Promise<Agent>;
|
||||
|
||||
|
|
|
@ -1,136 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { shouldCreateConfigAction } from './checkin';
|
||||
import { Agent } from '../../types';
|
||||
|
||||
function getAgent(data: Partial<Agent>) {
|
||||
return { actions: [], ...data } as Agent;
|
||||
}
|
||||
|
||||
describe('Agent checkin service', () => {
|
||||
describe('shouldCreateConfigAction', () => {
|
||||
it('should return false if the agent do not have an assigned config', () => {
|
||||
const res = shouldCreateConfigAction(getAgent({}), []);
|
||||
|
||||
expect(res).toBeFalsy();
|
||||
});
|
||||
|
||||
it('should return true if this is agent first checkin', () => {
|
||||
const res = shouldCreateConfigAction(getAgent({ config_id: 'config1' }), []);
|
||||
|
||||
expect(res).toBeTruthy();
|
||||
});
|
||||
|
||||
it('should return false agent is already running latest revision', () => {
|
||||
const res = shouldCreateConfigAction(
|
||||
getAgent({
|
||||
config_id: 'config1',
|
||||
last_checkin: '2018-01-02T00:00:00',
|
||||
config_revision: 1,
|
||||
config_newest_revision: 1,
|
||||
}),
|
||||
[]
|
||||
);
|
||||
|
||||
expect(res).toBeFalsy();
|
||||
});
|
||||
|
||||
it('should return false agent has already latest revision config change action', () => {
|
||||
const res = shouldCreateConfigAction(
|
||||
getAgent({
|
||||
config_id: 'config1',
|
||||
last_checkin: '2018-01-02T00:00:00',
|
||||
config_revision: 1,
|
||||
config_newest_revision: 2,
|
||||
}),
|
||||
[
|
||||
{
|
||||
id: 'action1',
|
||||
agent_id: 'agent1',
|
||||
type: 'CONFIG_CHANGE',
|
||||
created_at: new Date().toISOString(),
|
||||
data: {
|
||||
config: {
|
||||
id: 'config1',
|
||||
revision: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
expect(res).toBeFalsy();
|
||||
});
|
||||
|
||||
it('should return true agent has unrelated config change actions', () => {
|
||||
const res = shouldCreateConfigAction(
|
||||
getAgent({
|
||||
config_id: 'config1',
|
||||
last_checkin: '2018-01-02T00:00:00',
|
||||
config_revision: 1,
|
||||
config_newest_revision: 2,
|
||||
}),
|
||||
[
|
||||
{
|
||||
id: 'action1',
|
||||
agent_id: 'agent1',
|
||||
type: 'CONFIG_CHANGE',
|
||||
created_at: new Date().toISOString(),
|
||||
data: {
|
||||
config: {
|
||||
id: 'config2',
|
||||
revision: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 'action1',
|
||||
agent_id: 'agent1',
|
||||
type: 'CONFIG_CHANGE',
|
||||
created_at: new Date().toISOString(),
|
||||
data: {
|
||||
config: {
|
||||
id: 'config1',
|
||||
revision: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
expect(res).toBeTruthy();
|
||||
});
|
||||
|
||||
it('should return true if this agent has a new revision', () => {
|
||||
const res = shouldCreateConfigAction(
|
||||
getAgent({
|
||||
config_id: 'config1',
|
||||
last_checkin: '2018-01-02T00:00:00',
|
||||
config_revision: 1,
|
||||
config_newest_revision: 2,
|
||||
}),
|
||||
[]
|
||||
);
|
||||
|
||||
expect(res).toBeTruthy();
|
||||
});
|
||||
|
||||
it('should return true if this agent has no revision currently set', () => {
|
||||
const res = shouldCreateConfigAction(
|
||||
getAgent({
|
||||
config_id: 'config1',
|
||||
last_checkin: '2018-01-02T00:00:00',
|
||||
config_revision: null,
|
||||
config_newest_revision: 2,
|
||||
}),
|
||||
[]
|
||||
);
|
||||
|
||||
expect(res).toBeTruthy();
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,193 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { SavedObjectsClientContract, SavedObjectsBulkCreateObject } from 'src/core/server';
|
||||
import {
|
||||
Agent,
|
||||
NewAgentEvent,
|
||||
AgentEvent,
|
||||
AgentAction,
|
||||
AgentSOAttributes,
|
||||
AgentEventSOAttributes,
|
||||
AgentMetadata,
|
||||
} from '../../types';
|
||||
|
||||
import { agentConfigService } from '../agent_config';
|
||||
import * as APIKeysService from '../api_keys';
|
||||
import { AGENT_SAVED_OBJECT_TYPE, AGENT_EVENT_SAVED_OBJECT_TYPE } from '../../constants';
|
||||
import { getAgentActionsForCheckin, createAgentAction } from './actions';
|
||||
import { appContextService } from '../app_context';
|
||||
|
||||
export async function agentCheckin(
|
||||
soClient: SavedObjectsClientContract,
|
||||
agent: Agent,
|
||||
events: NewAgentEvent[],
|
||||
localMetadata?: any
|
||||
) {
|
||||
const updateData: {
|
||||
last_checkin: string;
|
||||
default_api_key?: string;
|
||||
default_api_key_id?: string;
|
||||
local_metadata?: AgentMetadata;
|
||||
current_error_events?: string;
|
||||
} = {
|
||||
last_checkin: new Date().toISOString(),
|
||||
};
|
||||
|
||||
const actions = await getAgentActionsForCheckin(soClient, agent.id);
|
||||
|
||||
// Generate new agent config if config is updated
|
||||
if (agent.config_id && shouldCreateConfigAction(agent, actions)) {
|
||||
const {
|
||||
attributes: { default_api_key: defaultApiKey },
|
||||
} = await appContextService
|
||||
.getEncryptedSavedObjects()
|
||||
.getDecryptedAsInternalUser<AgentSOAttributes>(AGENT_SAVED_OBJECT_TYPE, agent.id);
|
||||
|
||||
const config = await agentConfigService.getFullConfig(soClient, agent.config_id);
|
||||
if (config) {
|
||||
// Assign output API keys
|
||||
// We currently only support default ouput
|
||||
if (!defaultApiKey) {
|
||||
const outputAPIKey = await APIKeysService.generateOutputApiKey(
|
||||
soClient,
|
||||
'default',
|
||||
agent.id
|
||||
);
|
||||
updateData.default_api_key = outputAPIKey.key;
|
||||
updateData.default_api_key_id = outputAPIKey.id;
|
||||
}
|
||||
// Mutate the config to set the api token for this agent
|
||||
config.outputs.default.api_key = defaultApiKey || updateData.default_api_key;
|
||||
|
||||
const configChangeAction = await createAgentAction(soClient, {
|
||||
agent_id: agent.id,
|
||||
type: 'CONFIG_CHANGE',
|
||||
data: { config } as any,
|
||||
created_at: new Date().toISOString(),
|
||||
sent_at: undefined,
|
||||
});
|
||||
actions.push(configChangeAction);
|
||||
}
|
||||
}
|
||||
|
||||
const { updatedErrorEvents } = await processEventsForCheckin(soClient, agent, events);
|
||||
|
||||
// Persist changes
|
||||
if (updatedErrorEvents) {
|
||||
updateData.current_error_events = JSON.stringify(updatedErrorEvents);
|
||||
}
|
||||
|
||||
await soClient.update<AgentSOAttributes>(AGENT_SAVED_OBJECT_TYPE, agent.id, updateData);
|
||||
|
||||
return { actions };
|
||||
}
|
||||
|
||||
async function processEventsForCheckin(
|
||||
soClient: SavedObjectsClientContract,
|
||||
agent: Agent,
|
||||
events: NewAgentEvent[]
|
||||
) {
|
||||
const acknowledgedActionIds: string[] = [];
|
||||
const updatedErrorEvents: Array<AgentEvent | NewAgentEvent> = [...agent.current_error_events];
|
||||
for (const event of events) {
|
||||
// @ts-ignore
|
||||
event.config_id = agent.config_id;
|
||||
|
||||
if (isActionEvent(event)) {
|
||||
acknowledgedActionIds.push(event.action_id as string);
|
||||
}
|
||||
|
||||
if (isErrorOrState(event)) {
|
||||
// Remove any global or specific to a stream event
|
||||
const existingEventIndex = updatedErrorEvents.findIndex(
|
||||
(e) => e.stream_id === event.stream_id
|
||||
);
|
||||
if (existingEventIndex >= 0) {
|
||||
updatedErrorEvents.splice(existingEventIndex, 1);
|
||||
}
|
||||
if (event.type === 'ERROR') {
|
||||
updatedErrorEvents.push(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (events.length > 0) {
|
||||
await createEventsForAgent(soClient, agent.id, events);
|
||||
}
|
||||
|
||||
return {
|
||||
acknowledgedActionIds,
|
||||
updatedErrorEvents,
|
||||
};
|
||||
}
|
||||
|
||||
async function createEventsForAgent(
|
||||
soClient: SavedObjectsClientContract,
|
||||
agentId: string,
|
||||
events: NewAgentEvent[]
|
||||
) {
|
||||
const objects: Array<SavedObjectsBulkCreateObject<AgentEventSOAttributes>> = events.map(
|
||||
(eventData) => {
|
||||
return {
|
||||
attributes: {
|
||||
...eventData,
|
||||
payload: eventData.payload ? JSON.stringify(eventData.payload) : undefined,
|
||||
},
|
||||
type: AGENT_EVENT_SAVED_OBJECT_TYPE,
|
||||
};
|
||||
}
|
||||
);
|
||||
|
||||
return soClient.bulkCreate(objects);
|
||||
}
|
||||
|
||||
function isErrorOrState(event: AgentEvent | NewAgentEvent) {
|
||||
return event.type === 'STATE' || event.type === 'ERROR';
|
||||
}
|
||||
|
||||
function isActionEvent(event: AgentEvent | NewAgentEvent) {
|
||||
return (
|
||||
event.type === 'ACTION' && (event.subtype === 'ACKNOWLEDGED' || event.subtype === 'UNKNOWN')
|
||||
);
|
||||
}
|
||||
|
||||
export function shouldCreateConfigAction(agent: Agent, actions: AgentAction[]): boolean {
|
||||
if (!agent.config_id) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const isFirstCheckin = !agent.last_checkin;
|
||||
if (isFirstCheckin) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const isAgentConfigOutdated =
|
||||
// Config reassignment
|
||||
(!agent.config_revision && agent.config_newest_revision) ||
|
||||
// new revision of a config
|
||||
(agent.config_revision &&
|
||||
agent.config_newest_revision &&
|
||||
agent.config_revision < agent.config_newest_revision);
|
||||
|
||||
if (!isAgentConfigOutdated) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const isActionAlreadyGenerated = !!actions.find((action) => {
|
||||
if (!action.data || action.type !== 'CONFIG_CHANGE') {
|
||||
return false;
|
||||
}
|
||||
|
||||
const { data } = action;
|
||||
|
||||
return (
|
||||
data.config.id === agent.config_id && data.config.revision === agent.config_newest_revision
|
||||
);
|
||||
});
|
||||
|
||||
return !isActionAlreadyGenerated;
|
||||
}
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { SavedObjectsClientContract, SavedObjectsBulkCreateObject } from 'src/core/server';
|
||||
import {
|
||||
Agent,
|
||||
NewAgentEvent,
|
||||
AgentEvent,
|
||||
AgentSOAttributes,
|
||||
AgentEventSOAttributes,
|
||||
AgentMetadata,
|
||||
} from '../../../types';
|
||||
|
||||
import { AGENT_SAVED_OBJECT_TYPE, AGENT_EVENT_SAVED_OBJECT_TYPE } from '../../../constants';
|
||||
import { agentCheckinState } from './state';
|
||||
import { getAgentActionsForCheckin } from '../actions';
|
||||
|
||||
export async function agentCheckin(
|
||||
soClient: SavedObjectsClientContract,
|
||||
agent: Agent,
|
||||
events: NewAgentEvent[],
|
||||
localMetadata?: any,
|
||||
options?: { signal: AbortSignal }
|
||||
) {
|
||||
const updateData: {
|
||||
local_metadata?: AgentMetadata;
|
||||
current_error_events?: string;
|
||||
} = {};
|
||||
const { updatedErrorEvents } = await processEventsForCheckin(soClient, agent, events);
|
||||
if (updatedErrorEvents) {
|
||||
updateData.current_error_events = JSON.stringify(updatedErrorEvents);
|
||||
}
|
||||
if (localMetadata) {
|
||||
updateData.local_metadata = localMetadata;
|
||||
}
|
||||
if (Object.keys(updateData).length > 0) {
|
||||
await soClient.update<AgentSOAttributes>(AGENT_SAVED_OBJECT_TYPE, agent.id, updateData);
|
||||
}
|
||||
|
||||
// Check if some actions are not acknowledged
|
||||
let actions = await getAgentActionsForCheckin(soClient, agent.id);
|
||||
if (actions.length > 0) {
|
||||
return { actions };
|
||||
}
|
||||
|
||||
// Wait for new actions
|
||||
actions = await agentCheckinState.subscribeToNewActions(soClient, agent, options);
|
||||
|
||||
return { actions };
|
||||
}
|
||||
|
||||
async function processEventsForCheckin(
|
||||
soClient: SavedObjectsClientContract,
|
||||
agent: Agent,
|
||||
events: NewAgentEvent[]
|
||||
) {
|
||||
const updatedErrorEvents: Array<AgentEvent | NewAgentEvent> = [...agent.current_error_events];
|
||||
for (const event of events) {
|
||||
// @ts-ignore
|
||||
event.config_id = agent.config_id;
|
||||
|
||||
if (isErrorOrState(event)) {
|
||||
// Remove any global or specific to a stream event
|
||||
const existingEventIndex = updatedErrorEvents.findIndex(
|
||||
(e) => e.stream_id === event.stream_id
|
||||
);
|
||||
if (existingEventIndex >= 0) {
|
||||
updatedErrorEvents.splice(existingEventIndex, 1);
|
||||
}
|
||||
if (event.type === 'ERROR') {
|
||||
updatedErrorEvents.push(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (events.length > 0) {
|
||||
await createEventsForAgent(soClient, agent.id, events);
|
||||
}
|
||||
|
||||
return {
|
||||
updatedErrorEvents,
|
||||
};
|
||||
}
|
||||
|
||||
async function createEventsForAgent(
|
||||
soClient: SavedObjectsClientContract,
|
||||
agentId: string,
|
||||
events: NewAgentEvent[]
|
||||
) {
|
||||
const objects: Array<SavedObjectsBulkCreateObject<AgentEventSOAttributes>> = events.map(
|
||||
(eventData) => {
|
||||
return {
|
||||
attributes: {
|
||||
...eventData,
|
||||
payload: eventData.payload ? JSON.stringify(eventData.payload) : undefined,
|
||||
},
|
||||
type: AGENT_EVENT_SAVED_OBJECT_TYPE,
|
||||
};
|
||||
}
|
||||
);
|
||||
|
||||
return soClient.bulkCreate(objects);
|
||||
}
|
||||
|
||||
function isErrorOrState(event: AgentEvent | NewAgentEvent) {
|
||||
return event.type === 'STATE' || event.type === 'ERROR';
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import { Observable } from 'rxjs';
|
||||
|
||||
export class AbortError extends Error {}
|
||||
|
||||
export const toPromiseAbortable = <T>(
|
||||
observable: Observable<T>,
|
||||
signal?: AbortSignal
|
||||
): Promise<T> =>
|
||||
new Promise((resolve, reject) => {
|
||||
if (signal && signal.aborted) {
|
||||
reject(new AbortError('Aborted'));
|
||||
return;
|
||||
}
|
||||
|
||||
const listener = () => {
|
||||
subscription.unsubscribe();
|
||||
reject(new AbortError('Aborted'));
|
||||
};
|
||||
const cleanup = () => {
|
||||
if (signal) {
|
||||
signal.removeEventListener('abort', listener);
|
||||
}
|
||||
};
|
||||
const subscription = observable.subscribe(
|
||||
(data) => {
|
||||
cleanup();
|
||||
resolve(data);
|
||||
},
|
||||
(err) => {
|
||||
cleanup();
|
||||
reject(err);
|
||||
}
|
||||
);
|
||||
|
||||
if (signal) {
|
||||
signal.addEventListener('abort', listener, { once: true });
|
||||
}
|
||||
});
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { SavedObjectsClientContract } from 'src/core/server';
|
||||
import { Agent } from '../../../types';
|
||||
import { appContextService } from '../../app_context';
|
||||
import { agentCheckinStateConnectedAgentsFactory } from './state_connected_agents';
|
||||
import { agentCheckinStateNewActionsFactory } from './state_new_actions';
|
||||
import { AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS } from '../../../constants';
|
||||
|
||||
function agentCheckinStateFactory() {
|
||||
const agentConnected = agentCheckinStateConnectedAgentsFactory();
|
||||
const newActions = agentCheckinStateNewActionsFactory();
|
||||
let interval: NodeJS.Timeout;
|
||||
function start() {
|
||||
interval = setInterval(async () => {
|
||||
try {
|
||||
await agentConnected.updateLastCheckinAt();
|
||||
} catch (err) {
|
||||
appContextService.getLogger().error(err);
|
||||
}
|
||||
}, AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS);
|
||||
}
|
||||
|
||||
function stop() {
|
||||
if (interval) {
|
||||
clearInterval(interval);
|
||||
}
|
||||
}
|
||||
return {
|
||||
subscribeToNewActions: (
|
||||
soClient: SavedObjectsClientContract,
|
||||
agent: Agent,
|
||||
options?: { signal: AbortSignal }
|
||||
) =>
|
||||
agentConnected.wrapPromise(
|
||||
agent.id,
|
||||
newActions.subscribeToNewActions(soClient, agent, options)
|
||||
),
|
||||
start,
|
||||
stop,
|
||||
};
|
||||
}
|
||||
|
||||
export const agentCheckinState = agentCheckinStateFactory();
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { KibanaRequest, SavedObjectsBulkUpdateObject } from 'src/core/server';
|
||||
import { appContextService } from '../../app_context';
|
||||
import { AgentSOAttributes } from '../../../types';
|
||||
import { AGENT_SAVED_OBJECT_TYPE } from '../../../constants';
|
||||
|
||||
function getInternalUserSOClient() {
|
||||
const fakeRequest = ({
|
||||
headers: {},
|
||||
getBasePath: () => '',
|
||||
path: '/',
|
||||
route: { settings: {} },
|
||||
url: {
|
||||
href: '/',
|
||||
},
|
||||
raw: {
|
||||
req: {
|
||||
url: '/',
|
||||
},
|
||||
},
|
||||
} as unknown) as KibanaRequest;
|
||||
|
||||
return appContextService.getInternalUserSOClient(fakeRequest);
|
||||
}
|
||||
export function agentCheckinStateConnectedAgentsFactory() {
|
||||
const connectedAgentsIds = new Set<string>();
|
||||
let agentToUpdate = new Set<string>();
|
||||
|
||||
function addAgent(agentId: string) {
|
||||
connectedAgentsIds.add(agentId);
|
||||
agentToUpdate.add(agentId);
|
||||
}
|
||||
|
||||
function removeAgent(agentId: string) {
|
||||
connectedAgentsIds.delete(agentId);
|
||||
}
|
||||
|
||||
async function wrapPromise<T>(agentId: string, p: Promise<T>): Promise<T> {
|
||||
try {
|
||||
addAgent(agentId);
|
||||
const res = await p;
|
||||
removeAgent(agentId);
|
||||
return res;
|
||||
} catch (err) {
|
||||
removeAgent(agentId);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateLastCheckinAt() {
|
||||
if (agentToUpdate.size === 0) {
|
||||
return;
|
||||
}
|
||||
const internalSOClient = getInternalUserSOClient();
|
||||
const now = new Date().toISOString();
|
||||
const updates: Array<SavedObjectsBulkUpdateObject<AgentSOAttributes>> = [
|
||||
...connectedAgentsIds.values(),
|
||||
].map((agentId) => ({
|
||||
type: AGENT_SAVED_OBJECT_TYPE,
|
||||
id: agentId,
|
||||
attributes: {
|
||||
last_checkin: now,
|
||||
},
|
||||
}));
|
||||
|
||||
agentToUpdate = new Set<string>([...connectedAgentsIds.values()]);
|
||||
await internalSOClient.bulkUpdate<AgentSOAttributes>(updates, { refresh: false });
|
||||
}
|
||||
|
||||
return {
|
||||
wrapPromise,
|
||||
updateLastCheckinAt,
|
||||
};
|
||||
}
|
|
@ -0,0 +1,183 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { timer, from, Observable, TimeoutError } from 'rxjs';
|
||||
import {
|
||||
shareReplay,
|
||||
distinctUntilKeyChanged,
|
||||
switchMap,
|
||||
mergeMap,
|
||||
merge,
|
||||
filter,
|
||||
timeout,
|
||||
take,
|
||||
} from 'rxjs/operators';
|
||||
import { SavedObjectsClientContract, KibanaRequest } from 'src/core/server';
|
||||
import {
|
||||
Agent,
|
||||
AgentAction,
|
||||
AgentSOAttributes,
|
||||
AgentConfig,
|
||||
FullAgentConfig,
|
||||
} from '../../../types';
|
||||
import { agentConfigService } from '../../agent_config';
|
||||
import * as APIKeysService from '../../api_keys';
|
||||
import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants';
|
||||
import { createAgentAction, getNewActionsSince } from '../actions';
|
||||
import { appContextService } from '../../app_context';
|
||||
import { toPromiseAbortable, AbortError } from './rxjs_utils';
|
||||
|
||||
function getInternalUserSOClient() {
|
||||
const fakeRequest = ({
|
||||
headers: {},
|
||||
getBasePath: () => '',
|
||||
path: '/',
|
||||
route: { settings: {} },
|
||||
url: {
|
||||
href: '/',
|
||||
},
|
||||
raw: {
|
||||
req: {
|
||||
url: '/',
|
||||
},
|
||||
},
|
||||
} as unknown) as KibanaRequest;
|
||||
|
||||
return appContextService.getInternalUserSOClient(fakeRequest);
|
||||
}
|
||||
|
||||
function createAgentConfigSharedObservable(configId: string) {
|
||||
const internalSOClient = getInternalUserSOClient();
|
||||
return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe(
|
||||
switchMap(() =>
|
||||
from(agentConfigService.get(internalSOClient, configId) as Promise<AgentConfig>)
|
||||
),
|
||||
distinctUntilKeyChanged('revision'),
|
||||
switchMap((data) => from(agentConfigService.getFullConfig(internalSOClient, configId))),
|
||||
shareReplay({ refCount: true, bufferSize: 1 })
|
||||
);
|
||||
}
|
||||
|
||||
function createNewActionsSharedObservable(): Observable<AgentAction[]> {
|
||||
return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe(
|
||||
switchMap(() => {
|
||||
const internalSOClient = getInternalUserSOClient();
|
||||
|
||||
return from(getNewActionsSince(internalSOClient, new Date().toISOString()));
|
||||
}),
|
||||
shareReplay({ refCount: true, bufferSize: 1 })
|
||||
);
|
||||
}
|
||||
|
||||
async function getOrCreateAgentDefaultOutputAPIKey(
|
||||
soClient: SavedObjectsClientContract,
|
||||
agent: Agent
|
||||
): Promise<string> {
|
||||
const {
|
||||
attributes: { default_api_key: defaultApiKey },
|
||||
} = await appContextService
|
||||
.getEncryptedSavedObjects()
|
||||
.getDecryptedAsInternalUser<AgentSOAttributes>(AGENT_SAVED_OBJECT_TYPE, agent.id);
|
||||
|
||||
if (defaultApiKey) {
|
||||
return defaultApiKey;
|
||||
}
|
||||
|
||||
const outputAPIKey = await APIKeysService.generateOutputApiKey(soClient, 'default', agent.id);
|
||||
await soClient.update<AgentSOAttributes>(AGENT_SAVED_OBJECT_TYPE, agent.id, {
|
||||
default_api_key: outputAPIKey.key,
|
||||
default_api_key_id: outputAPIKey.id,
|
||||
});
|
||||
|
||||
return outputAPIKey.key;
|
||||
}
|
||||
|
||||
async function createAgentActionFromConfigIfOutdated(
|
||||
soClient: SavedObjectsClientContract,
|
||||
agent: Agent,
|
||||
config: FullAgentConfig | null
|
||||
) {
|
||||
if (!config || !config.revision) {
|
||||
return;
|
||||
}
|
||||
const isAgentConfigOutdated = !agent.config_revision || agent.config_revision < config.revision;
|
||||
if (!isAgentConfigOutdated) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Deep clone !not supporting Date, and undefined value.
|
||||
const newConfig = JSON.parse(JSON.stringify(config));
|
||||
|
||||
// Mutate the config to set the api token for this agent
|
||||
newConfig.outputs.default.api_key = await getOrCreateAgentDefaultOutputAPIKey(soClient, agent);
|
||||
|
||||
const configChangeAction = await createAgentAction(soClient, {
|
||||
agent_id: agent.id,
|
||||
type: 'CONFIG_CHANGE',
|
||||
data: { config: newConfig } as any,
|
||||
created_at: new Date().toISOString(),
|
||||
sent_at: undefined,
|
||||
});
|
||||
|
||||
return [configChangeAction];
|
||||
}
|
||||
|
||||
export function agentCheckinStateNewActionsFactory() {
|
||||
// Shared Observables
|
||||
const agentConfigs$ = new Map<string, Observable<FullAgentConfig | null>>();
|
||||
const newActions$ = createNewActionsSharedObservable();
|
||||
|
||||
async function subscribeToNewActions(
|
||||
soClient: SavedObjectsClientContract,
|
||||
agent: Agent,
|
||||
options?: { signal: AbortSignal }
|
||||
): Promise<AgentAction[]> {
|
||||
if (!agent.config_id) {
|
||||
throw new Error('Agent do not have a config');
|
||||
}
|
||||
const configId = agent.config_id;
|
||||
if (!agentConfigs$.has(configId)) {
|
||||
agentConfigs$.set(configId, createAgentConfigSharedObservable(configId));
|
||||
}
|
||||
const agentConfig$ = agentConfigs$.get(configId);
|
||||
if (!agentConfig$) {
|
||||
throw new Error(`Invalid state no observable for config ${configId}`);
|
||||
}
|
||||
const stream$ = agentConfig$.pipe(
|
||||
timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0),
|
||||
mergeMap((config) => createAgentActionFromConfigIfOutdated(soClient, agent, config)),
|
||||
merge(newActions$),
|
||||
mergeMap(async (data) => {
|
||||
if (!data) {
|
||||
return;
|
||||
}
|
||||
const newActions = data.filter((action) => action.agent_id);
|
||||
if (newActions.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
return newActions;
|
||||
}),
|
||||
filter((data) => data !== undefined),
|
||||
take(1)
|
||||
);
|
||||
try {
|
||||
const data = await toPromiseAbortable(stream$, options?.signal);
|
||||
|
||||
return data || [];
|
||||
} catch (err) {
|
||||
if (err instanceof TimeoutError || err instanceof AbortError) {
|
||||
return [];
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
subscribeToNewActions,
|
||||
};
|
||||
}
|
|
@ -5,7 +5,7 @@
|
|||
*/
|
||||
import { BehaviorSubject, Observable } from 'rxjs';
|
||||
import { first } from 'rxjs/operators';
|
||||
import { SavedObjectsServiceStart, HttpServiceSetup, Logger } from 'src/core/server';
|
||||
import { SavedObjectsServiceStart, HttpServiceSetup, Logger, KibanaRequest } from 'src/core/server';
|
||||
import {
|
||||
EncryptedSavedObjectsClient,
|
||||
EncryptedSavedObjectsPluginSetup,
|
||||
|
@ -89,6 +89,13 @@ class AppContextService {
|
|||
return this.savedObjects;
|
||||
}
|
||||
|
||||
public getInternalUserSOClient(request: KibanaRequest) {
|
||||
// soClient as kibana internal users, be carefull on how you use it, security is not enabled
|
||||
return appContextService.getSavedObjects().getScopedClient(request, {
|
||||
excludedWrappers: ['security'],
|
||||
});
|
||||
}
|
||||
|
||||
public getIsProductionMode() {
|
||||
return this.isProductionMode;
|
||||
}
|
||||
|
|
1
x-pack/plugins/ingest_manager/yarn.lock
Symbolic link
1
x-pack/plugins/ingest_manager/yarn.lock
Symbolic link
|
@ -0,0 +1 @@
|
|||
../../../yarn.lock
|
|
@ -8,7 +8,7 @@ import expect from '@kbn/expect';
|
|||
import uuid from 'uuid';
|
||||
|
||||
import { FtrProviderContext } from '../../../ftr_provider_context';
|
||||
import { getSupertestWithoutAuth } from './services';
|
||||
import { getSupertestWithoutAuth, setupIngest } from './services';
|
||||
|
||||
export default function (providerContext: FtrProviderContext) {
|
||||
const { getService } = providerContext;
|
||||
|
@ -44,6 +44,7 @@ export default function (providerContext: FtrProviderContext) {
|
|||
},
|
||||
});
|
||||
});
|
||||
setupIngest(providerContext);
|
||||
after(async () => {
|
||||
await esArchiver.unload('fleet/agents');
|
||||
});
|
||||
|
|
|
@ -29,6 +29,7 @@ export async function getApiIntegrationConfig({ readConfigFile }: FtrConfigProvi
|
|||
'--optimize.enabled=false',
|
||||
'--telemetry.optIn=true',
|
||||
'--xpack.ingestManager.enabled=true',
|
||||
'--xpack.ingestManager.fleet.pollingRequestTimeout=5000', // 5 seconds
|
||||
'--xpack.securitySolution.alertResultListDefaultDateRange.from=2018-01-10T00:00:00.000Z',
|
||||
],
|
||||
},
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
"access_api_key_id": "api-key-2",
|
||||
"active": true,
|
||||
"shared_id": "agent1_filebeat",
|
||||
"config_id": "1",
|
||||
"config_id": "config1",
|
||||
"type": "PERMANENT",
|
||||
"local_metadata": {},
|
||||
"user_provided_metadata": {}
|
||||
|
|
|
@ -6410,6 +6410,13 @@ abort-controller@^2.0.3:
|
|||
dependencies:
|
||||
event-target-shim "^5.0.0"
|
||||
|
||||
abort-controller@^3.0.0:
|
||||
version "3.0.0"
|
||||
resolved "https://registry.yarnpkg.com/abort-controller/-/abort-controller-3.0.0.tgz#eaf54d53b62bae4138e809ca225c8439a6efb392"
|
||||
integrity sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg==
|
||||
dependencies:
|
||||
event-target-shim "^5.0.0"
|
||||
|
||||
abortcontroller-polyfill@^1.4.0:
|
||||
version "1.4.0"
|
||||
resolved "https://registry.yarnpkg.com/abortcontroller-polyfill/-/abortcontroller-polyfill-1.4.0.tgz#0d5eb58e522a461774af8086414f68e1dda7a6c4"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue