[EDR Workflows][Osquery] Use newly added action responses data stream (#183892)

**Prerequisite**: https://github.com/elastic/elasticsearch/pull/108849

**Follow-up**: https://github.com/elastic/integrations/pull/9661

This PR introduces a new index
`logs-osquery_manager.action.responses-default` for action responses.
This index will be added in Osquery Manager integration version `1.12`
and will replace the existing
`.logs-osquery_manager.action.responses-default`, which is currently
populated by a transform from `.fleet-actions`.

Since most users will still be using the old integration package, we
ensured that the implementation checks the old index first and returns
the response from there unless the new index is available. If the new
index is available, the response will come from it. This change ensures
compatibility with all user scenarios.
This commit is contained in:
Konrad Szwarc 2024-05-24 10:56:24 +02:00 committed by GitHub
parent 09a165f3e6
commit 9bafd068e0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 80 additions and 22 deletions

View file

@ -15,6 +15,7 @@ export const RESULTS_INDEX = `${OSQUERY_LOGS_BASE}.results`;
export const OSQUERY_ACTIONS_INDEX = `${ACTIONS_INDEX}-*`;
export const ACTION_RESPONSES_INDEX = `.logs-${OSQUERY_INTEGRATION_NAME}.action.responses`;
export const ACTION_RESPONSES_DATA_STREAM_INDEX = `logs-${OSQUERY_INTEGRATION_NAME}.action.responses`;
export const DEFAULT_PLATFORM = 'linux,windows,darwin';

View file

@ -83,4 +83,5 @@ export interface ActionResultsStrategyResponse
export interface ActionResultsRequestOptions extends RequestOptionsPaginated {
actionId: string;
startDate?: string;
useNewDataStream?: boolean;
}

View file

@ -9,7 +9,10 @@ import type { ISearchRequestParams } from '@kbn/search-types';
import { AGENT_ACTIONS_RESULTS_INDEX } from '@kbn/fleet-plugin/common';
import { isEmpty } from 'lodash';
import moment from 'moment';
import { ACTION_RESPONSES_INDEX } from '../../../../../../common/constants';
import {
ACTION_RESPONSES_DATA_STREAM_INDEX,
ACTION_RESPONSES_INDEX,
} from '../../../../../../common/constants';
import type { ActionResultsRequestOptions } from '../../../../../../common/search_strategy';
import { getQueryFilter } from '../../../../../utils/build_query';
@ -19,6 +22,7 @@ export const buildActionResultsQuery = ({
startDate,
sort,
componentTemplateExists,
useNewDataStream,
}: ActionResultsRequestOptions): ISearchRequestParams => {
let filter = `action_id: ${actionId}`;
if (!isEmpty(kuery)) {
@ -41,11 +45,18 @@ export const buildActionResultsQuery = ({
const filterQuery = [...timeRangeFilter, getQueryFilter({ filter })];
let index: string;
if (useNewDataStream) {
index = ACTION_RESPONSES_DATA_STREAM_INDEX;
} else if (componentTemplateExists) {
index = ACTION_RESPONSES_INDEX;
} else {
index = AGENT_ACTIONS_RESULTS_INDEX;
}
return {
allow_no_indices: true,
index: componentTemplateExists
? `${ACTION_RESPONSES_INDEX}-default*`
: `${AGENT_ACTIONS_RESULTS_INDEX}*`,
index,
ignore_unavailable: true,
body: {
aggs: {

View file

@ -5,17 +5,18 @@
* 2.0.
*/
import { from, map, mergeMap } from 'rxjs';
import { map, mergeMap, forkJoin, from, of } from 'rxjs';
import type { ISearchStrategy, PluginStart } from '@kbn/data-plugin/server';
import { shimHitsTotal } from '@kbn/data-plugin/server';
import { ENHANCED_ES_SEARCH_STRATEGY } from '@kbn/data-plugin/common';
import type { CoreStart } from '@kbn/core/server';
import { ACTIONS_INDEX } from '../../../common/constants';
import { ACTION_RESPONSES_DATA_STREAM_INDEX, ACTIONS_INDEX } from '../../../common/constants';
import type {
FactoryQueryTypes,
StrategyResponseType,
StrategyRequestType,
} from '../../../common/search_strategy/osquery';
import { OsqueryQueries } from '../../../common/search_strategy/osquery';
import { osqueryFactory } from './factory';
import type { OsqueryFactory } from './factory/types';
@ -33,12 +34,15 @@ export const osquerySearchStrategyProvider = <T extends FactoryQueryTypes>(
const queryFactory: OsqueryFactory<T> = osqueryFactory[request.factoryQueryType];
return from(
esClient.asInternalUser.indices.exists({
return forkJoin({
actionsIndexExists: esClient.asInternalUser.indices.exists({
index: `${ACTIONS_INDEX}*`,
})
).pipe(
mergeMap((exists) => {
}),
newDataStreamIndexExists: esClient.asInternalUser.indices.exists({
index: `${ACTION_RESPONSES_DATA_STREAM_INDEX}*`,
}),
}).pipe(
mergeMap(({ actionsIndexExists, newDataStreamIndexExists }) => {
const strictRequest = {
factoryQueryType: request.factoryQueryType,
kuery: request.kuery,
@ -51,7 +55,7 @@ export const osquerySearchStrategyProvider = <T extends FactoryQueryTypes>(
const dsl = queryFactory.buildDsl({
...strictRequest,
componentTemplateExists: exists,
componentTemplateExists: actionsIndexExists,
} as StrategyRequestType<T>);
// use internal user for searching .fleet* indices
es =
@ -59,7 +63,7 @@ export const osquerySearchStrategyProvider = <T extends FactoryQueryTypes>(
? data.search.searchAsInternalUser
: data.search.getSearchStrategy(ENHANCED_ES_SEARCH_STRATEGY);
return es.search(
const searchLegacyIndex$ = es.search(
{
...strictRequest,
params: dsl,
@ -67,15 +71,56 @@ export const osquerySearchStrategyProvider = <T extends FactoryQueryTypes>(
options,
deps
);
}),
map((response) => ({
...response,
...{
rawResponse: shimHitsTotal(response.rawResponse, options),
},
total: response.rawResponse.hits.total as number,
})),
mergeMap((esSearchRes) => queryFactory.parse(request, esSearchRes))
// With the introduction of a new DS that sends data directly from an agent into the new index
// logs-osquery_manager.action.responses-default, instead of the old index .logs-osquery_manager.action.responses-default
// which was populated by a transform, we now need to check both places for results.
// The new index was introduced in integration package 1.12, so users running earlier versions won't have it.
return searchLegacyIndex$.pipe(
mergeMap((legacyIndexResponse) => {
if (
request.factoryQueryType === OsqueryQueries.actionResults &&
newDataStreamIndexExists
) {
const dataStreamDsl = queryFactory.buildDsl({
...strictRequest,
componentTemplateExists: actionsIndexExists,
useNewDataStream: true,
} as StrategyRequestType<T>);
return from(
es.search(
{
...strictRequest,
params: dataStreamDsl,
},
options,
deps
)
).pipe(
map((newDataStreamIndexResponse) => {
if (newDataStreamIndexResponse.rawResponse.hits.total) {
return newDataStreamIndexResponse;
}
return legacyIndexResponse;
})
);
}
return of(legacyIndexResponse);
}),
map((response) => ({
...response,
...{
rawResponse: shimHitsTotal(response.rawResponse, options),
},
total: response.rawResponse.hits.total as number,
})),
mergeMap((esSearchRes) => queryFactory.parse(request, esSearchRes))
);
})
);
},
cancel: async (id, options, deps) => {