small bug fixes that are already in master but that PR had too many conflicts to resolve so just adding bug fixes here (#58544)

This commit is contained in:
Devin W. Hurley 2020-02-25 21:41:53 -05:00 committed by GitHub
parent a0f212d441
commit 4e61c134a6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 301 additions and 240 deletions

View file

@ -17,6 +17,7 @@ import {
IRuleStatusAttributes,
} from '../../rules/types';
import { ruleStatusSavedObjectType } from '../../rules/saved_object_mappings';
import { transformError } from '../utils';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const convertToSnakeCase = <T extends Record<string, any>>(obj: T): Partial<T> | null => {
@ -59,33 +60,44 @@ export const createFindRulesStatusRoute: Hapi.ServerRoute = {
"anotherAlertId": ...
}
*/
const statuses = await query.ids.reduce<Promise<RuleStatusResponse | {}>>(async (acc, id) => {
const lastFiveErrorsForId = await savedObjectsClient.find<
IRuleSavedAttributesSavedObjectAttributes
>({
type: ruleStatusSavedObjectType,
perPage: 6,
sortField: 'statusDate',
sortOrder: 'desc',
search: id,
searchFields: ['alertId'],
});
const accumulated = await acc;
const currentStatus = convertToSnakeCase<IRuleStatusAttributes>(
lastFiveErrorsForId.saved_objects[0]?.attributes
);
const failures = lastFiveErrorsForId.saved_objects
.slice(1)
.map(errorItem => convertToSnakeCase<IRuleStatusAttributes>(errorItem.attributes));
return {
...accumulated,
[id]: {
current_status: currentStatus,
failures,
},
};
}, Promise.resolve<RuleStatusResponse>({}));
return statuses;
try {
const statuses = await query.ids.reduce<Promise<RuleStatusResponse | {}>>(async (acc, id) => {
const lastFiveErrorsForId = await savedObjectsClient.find<
IRuleSavedAttributesSavedObjectAttributes
>({
type: ruleStatusSavedObjectType,
perPage: 6,
sortField: 'statusDate',
sortOrder: 'desc',
search: id,
searchFields: ['alertId'],
});
const accumulated = await acc;
const currentStatus = convertToSnakeCase<IRuleStatusAttributes>(
lastFiveErrorsForId.saved_objects[0]?.attributes
);
const failures = lastFiveErrorsForId.saved_objects
.slice(1)
.map(errorItem => convertToSnakeCase<IRuleStatusAttributes>(errorItem.attributes));
return {
...accumulated,
[id]: {
current_status: currentStatus,
failures,
},
};
}, Promise.resolve<RuleStatusResponse>({}));
return statuses;
} catch (err) {
const error = transformError(err);
return headers
.response({
message: error.message,
status_code: error.statusCode,
})
.code(error.statusCode);
}
},
};

View file

@ -7,6 +7,7 @@
import Hapi from 'hapi';
import { chunk, isEmpty, isFunction } from 'lodash/fp';
import { extname } from 'path';
import { Readable } from 'stream';
import { createPromiseFromStreams } from '../../../../../../../../../src/legacy/utils/streams';
import { DETECTION_ENGINE_RULES_URL } from '../../../../../common/constants';
import { createRules } from '../../rules/create_rules';
@ -19,6 +20,7 @@ import {
getIndex,
createBulkErrorObject,
ImportRuleResponse,
transformError,
} from '../utils';
import { createRulesStreamFromNdJson } from '../../rules/create_rules_stream_from_ndjson';
import { ImportRuleAlertRest } from '../../types';
@ -72,177 +74,190 @@ export const createImportRulesRoute = (server: ServerFacade): Hapi.ServerRoute =
}
const objectLimit = server.config().get<number>('savedObjects.maxImportExportSize');
const readStream = createRulesStreamFromNdJson(request.payload.file, objectLimit);
const parsedObjects = await createPromiseFromStreams<PromiseFromStreams[]>([readStream]);
const [duplicateIdErrors, uniqueParsedObjects] = getTupleDuplicateErrorsAndUniqueRules(
parsedObjects,
request.query.overwrite
);
const chunkParseObjects = chunk(CHUNK_PARSED_OBJECT_SIZE, uniqueParsedObjects);
let importRuleResponse: ImportRuleResponse[] = [];
while (chunkParseObjects.length) {
const batchParseObjects = chunkParseObjects.shift() ?? [];
const newImportRuleResponse = await Promise.all(
batchParseObjects.reduce<Array<Promise<ImportRuleResponse>>>((accum, parsedRule) => {
const importsWorkerPromise = new Promise<ImportRuleResponse>(
async (resolve, reject) => {
if (parsedRule instanceof Error) {
// If the JSON object had a validation or parse error then we return
// early with the error and an (unknown) for the ruleId
resolve(
createBulkErrorObject({
statusCode: 400,
message: parsedRule.message,
})
);
return null;
}
const {
description,
enabled,
false_positives: falsePositives,
from,
immutable,
query,
language,
output_index: outputIndex,
saved_id: savedId,
meta,
filters,
rule_id: ruleId,
index,
interval,
max_signals: maxSignals,
risk_score: riskScore,
name,
severity,
tags,
threat,
to,
type,
references,
timeline_id: timelineId,
timeline_title: timelineTitle,
version,
} = parsedRule;
try {
const finalIndex = getIndex(request, server);
const callWithRequest = callWithRequestFactory(request, server);
const indexExists = await getIndexExists(callWithRequest, finalIndex);
if (!indexExists) {
resolve(
createBulkErrorObject({
ruleId,
statusCode: 409,
message: `To create a rule, the index must exist first. Index ${finalIndex} does not exist`,
})
);
}
const rule = await readRules({ alertsClient, ruleId });
if (rule == null) {
await createRules({
alertsClient,
actionsClient,
description,
enabled,
falsePositives,
from,
immutable,
query,
language,
outputIndex: finalIndex,
savedId,
timelineId,
timelineTitle,
meta,
filters,
ruleId,
index,
interval,
maxSignals,
riskScore,
name,
severity,
tags,
to,
type,
threat,
references,
version,
});
resolve({ rule_id: ruleId, status_code: 200 });
} else if (rule != null && request.query.overwrite) {
await patchRules({
alertsClient,
actionsClient,
savedObjectsClient,
description,
enabled,
falsePositives,
from,
immutable,
query,
language,
outputIndex,
savedId,
timelineId,
timelineTitle,
meta,
filters,
id: undefined,
ruleId,
index,
interval,
maxSignals,
riskScore,
name,
severity,
tags,
to,
type,
threat,
references,
version,
});
resolve({ rule_id: ruleId, status_code: 200 });
} else if (rule != null) {
resolve(
createBulkErrorObject({
ruleId,
statusCode: 409,
message: `rule_id: "${ruleId}" already exists`,
})
);
}
} catch (err) {
resolve(
createBulkErrorObject({
ruleId,
statusCode: 400,
message: err.message,
})
);
}
}
);
return [...accum, importsWorkerPromise];
}, [])
try {
const readStream = createRulesStreamFromNdJson(objectLimit);
const parsedObjects = await createPromiseFromStreams<PromiseFromStreams[]>([
request.payload.file as Readable,
...readStream,
]);
const [duplicateIdErrors, uniqueParsedObjects] = getTupleDuplicateErrorsAndUniqueRules(
parsedObjects,
request.query.overwrite
);
importRuleResponse = [
...duplicateIdErrors,
...importRuleResponse,
...newImportRuleResponse,
];
}
const errorsResp = importRuleResponse.filter(resp => !isEmpty(resp.error));
return {
success: errorsResp.length === 0,
success_count: importRuleResponse.filter(resp => resp.status_code === 200).length,
errors: errorsResp,
};
const chunkParseObjects = chunk(CHUNK_PARSED_OBJECT_SIZE, uniqueParsedObjects);
let importRuleResponse: ImportRuleResponse[] = [];
while (chunkParseObjects.length) {
const batchParseObjects = chunkParseObjects.shift() ?? [];
const newImportRuleResponse = await Promise.all(
batchParseObjects.reduce<Array<Promise<ImportRuleResponse>>>((accum, parsedRule) => {
const importsWorkerPromise = new Promise<ImportRuleResponse>(
async (resolve, reject) => {
if (parsedRule instanceof Error) {
// If the JSON object had a validation or parse error then we return
// early with the error and an (unknown) for the ruleId
resolve(
createBulkErrorObject({
statusCode: 400,
message: parsedRule.message,
})
);
return null;
}
const {
description,
enabled,
false_positives: falsePositives,
from,
immutable,
query,
language,
output_index: outputIndex,
saved_id: savedId,
meta,
filters,
rule_id: ruleId,
index,
interval,
max_signals: maxSignals,
risk_score: riskScore,
name,
severity,
tags,
threat,
to,
type,
references,
timeline_id: timelineId,
timeline_title: timelineTitle,
version,
} = parsedRule;
try {
const finalIndex = getIndex(request, server);
const callWithRequest = callWithRequestFactory(request, server);
const indexExists = await getIndexExists(callWithRequest, finalIndex);
if (!indexExists) {
resolve(
createBulkErrorObject({
ruleId,
statusCode: 409,
message: `To create a rule, the index must exist first. Index ${finalIndex} does not exist`,
})
);
}
const rule = await readRules({ alertsClient, ruleId });
if (rule == null) {
await createRules({
alertsClient,
actionsClient,
description,
enabled,
falsePositives,
from,
immutable,
query,
language,
outputIndex: finalIndex,
savedId,
timelineId,
timelineTitle,
meta,
filters,
ruleId,
index,
interval,
maxSignals,
riskScore,
name,
severity,
tags,
to,
type,
threat,
references,
version,
});
resolve({ rule_id: ruleId, status_code: 200 });
} else if (rule != null && request.query.overwrite) {
await patchRules({
alertsClient,
actionsClient,
savedObjectsClient,
description,
enabled,
falsePositives,
from,
immutable,
query,
language,
outputIndex,
savedId,
timelineId,
timelineTitle,
meta,
filters,
id: undefined,
ruleId,
index,
interval,
maxSignals,
riskScore,
name,
severity,
tags,
to,
type,
threat,
references,
version,
});
resolve({ rule_id: ruleId, status_code: 200 });
} else if (rule != null) {
resolve(
createBulkErrorObject({
ruleId,
statusCode: 409,
message: `rule_id: "${ruleId}" already exists`,
})
);
}
} catch (err) {
resolve(
createBulkErrorObject({
ruleId,
statusCode: 400,
message: err.message,
})
);
}
}
);
return [...accum, importsWorkerPromise];
}, [])
);
importRuleResponse = [
...duplicateIdErrors,
...importRuleResponse,
...newImportRuleResponse,
];
}
const errorsResp = importRuleResponse.filter(resp => !isEmpty(resp.error));
return {
success: errorsResp.length === 0,
success_count: importRuleResponse.filter(resp => resp.status_code === 200).length,
errors: errorsResp,
};
} catch (exc) {
const error = transformError(exc);
return headers
.response({
message: error.message,
status_code: error.statusCode,
})
.code(error.statusCode);
}
},
};
};

View file

@ -1251,9 +1251,10 @@ describe('utils', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const parsedObjects = await createPromiseFromStreams<PromiseFromStreams[]>([
rulesObjectsStream,
ndJsonStream,
...rulesObjectsStream,
]);
const [errors, output] = getTupleDuplicateErrorsAndUniqueRules(parsedObjects, false);
const isInstanceOfError = output[0] instanceof Error;
@ -1272,9 +1273,10 @@ describe('utils', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const parsedObjects = await createPromiseFromStreams<PromiseFromStreams[]>([
rulesObjectsStream,
ndJsonStream,
...rulesObjectsStream,
]);
const [errors, output] = getTupleDuplicateErrorsAndUniqueRules(parsedObjects, false);
@ -1300,9 +1302,10 @@ describe('utils', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const parsedObjects = await createPromiseFromStreams<PromiseFromStreams[]>([
rulesObjectsStream,
ndJsonStream,
...rulesObjectsStream,
]);
const [errors, output] = getTupleDuplicateErrorsAndUniqueRules(parsedObjects, true);
@ -1320,9 +1323,10 @@ describe('utils', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const parsedObjects = await createPromiseFromStreams<PromiseFromStreams[]>([
rulesObjectsStream,
ndJsonStream,
...rulesObjectsStream,
]);
const [errors, output] = getTupleDuplicateErrorsAndUniqueRules(parsedObjects, false);
const isInstanceOfError = output[0] instanceof Error;

View file

@ -180,9 +180,7 @@ export const transform = (
if (!ruleStatus && isAlertType(alert)) {
return transformAlertToRule(alert);
}
if (isAlertType(alert) && isRuleStatusFindType(ruleStatus)) {
return transformAlertToRule(alert, ruleStatus.saved_objects[0]);
} else if (isAlertType(alert) && isRuleStatusSavedObjectType(ruleStatus)) {
if (isAlertType(alert) && isRuleStatusSavedObjectType(ruleStatus)) {
return transformAlertToRule(alert, ruleStatus);
} else {
return null;
@ -195,7 +193,7 @@ export const transformOrBulkError = (
ruleStatus?: unknown
): Partial<OutputRuleAlertRest> | BulkError => {
if (isAlertType(alert)) {
if (isRuleStatusFindType(ruleStatus)) {
if (isRuleStatusFindType(ruleStatus) && ruleStatus?.saved_objects.length > 0) {
return transformAlertToRule(alert, ruleStatus?.saved_objects[0] ?? ruleStatus);
} else {
return transformAlertToRule(alert);

View file

@ -24,20 +24,27 @@ export const querySignalsRouteDef = (server: ServerFacade): Hapi.ServerRoute =>
payload: querySignalsSchema,
},
},
async handler(request: SignalsQueryRequest) {
async handler(request: SignalsQueryRequest, headers) {
const { query, aggs, _source, track_total_hits, size } = request.payload;
const index = getIndex(request, server);
const { callWithRequest } = server.plugins.elasticsearch.getCluster('data');
try {
return callWithRequest(request, 'search', {
const searchSignalsIndexResult = await callWithRequest(request, 'search', {
index,
body: { query, aggs, _source, track_total_hits, size },
ignoreUnavailable: true,
});
return searchSignalsIndexResult;
} catch (exc) {
// error while getting or updating signal with id: id in signal index .siem-signals
return transformError(exc);
const error = transformError(exc);
return headers
.response({
message: error.message,
status_code: error.statusCode,
})
.code(error.statusCode);
}
},
};

View file

@ -5,12 +5,10 @@
*/
import { Readable } from 'stream';
import { createRulesStreamFromNdJson } from './create_rules_stream_from_ndjson';
import { createPromiseFromStreams, createConcatStream } from 'src/legacy/utils/streams';
import { createPromiseFromStreams } from 'src/legacy/utils/streams';
import { ImportRuleAlertRest } from '../types';
const readStreamToCompletion = (stream: Readable) => {
return createPromiseFromStreams([stream, createConcatStream([])]);
};
type PromiseFromStreams = ImportRuleAlertRest | Error;
export const getOutputSample = (): Partial<ImportRuleAlertRest> => ({
rule_id: 'rule-1',
@ -43,8 +41,11 @@ describe('create_rules_stream_from_ndjson', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const result = await readStreamToCompletion(rulesObjectsStream);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const result = await createPromiseFromStreams<PromiseFromStreams[]>([
ndJsonStream,
...rulesObjectsStream,
]);
expect(result).toEqual([
{
rule_id: 'rule-1',
@ -108,8 +109,11 @@ describe('create_rules_stream_from_ndjson', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const result = await readStreamToCompletion(rulesObjectsStream);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const result = await createPromiseFromStreams<PromiseFromStreams[]>([
ndJsonStream,
...rulesObjectsStream,
]);
expect(result).toEqual([
{
rule_id: 'rule-1',
@ -172,8 +176,11 @@ describe('create_rules_stream_from_ndjson', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const result = await readStreamToCompletion(rulesObjectsStream);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const result = await createPromiseFromStreams<PromiseFromStreams[]>([
ndJsonStream,
...rulesObjectsStream,
]);
expect(result).toEqual([
{
rule_id: 'rule-1',
@ -236,8 +243,11 @@ describe('create_rules_stream_from_ndjson', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const result = await readStreamToCompletion(rulesObjectsStream);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const result = await createPromiseFromStreams<PromiseFromStreams[]>([
ndJsonStream,
...rulesObjectsStream,
]);
const resultOrError = result as Error[];
expect(resultOrError[0]).toEqual({
rule_id: 'rule-1',
@ -300,8 +310,11 @@ describe('create_rules_stream_from_ndjson', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const result = await readStreamToCompletion(rulesObjectsStream);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const result = await createPromiseFromStreams<PromiseFromStreams[]>([
ndJsonStream,
...rulesObjectsStream,
]);
const resultOrError = result as TypeError[];
expect(resultOrError[0]).toEqual({
rule_id: 'rule-1',
@ -366,8 +379,11 @@ describe('create_rules_stream_from_ndjson', () => {
this.push(null);
},
});
const rulesObjectsStream = createRulesStreamFromNdJson(ndJsonStream, 1000);
const result = await readStreamToCompletion(rulesObjectsStream);
const rulesObjectsStream = createRulesStreamFromNdJson(1000);
const result = await createPromiseFromStreams<PromiseFromStreams[]>([
ndJsonStream,
...rulesObjectsStream,
]);
const resultOrError = result as TypeError[];
expect(resultOrError[1] instanceof TypeError).toEqual(true);
});

View file

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Readable, Transform } from 'stream';
import { Transform } from 'stream';
import { has, isString } from 'lodash/fp';
import { ImportRuleAlertRest } from '../types';
import {
@ -74,15 +74,13 @@ export const createLimitStream = (limit: number): Transform => {
* Inspiration and the pattern of code followed is from:
* saved_objects/lib/create_saved_objects_stream_from_ndjson.ts
*/
export const createRulesStreamFromNdJson = (
ndJsonStream: Readable,
ruleLimit: number
): Transform => {
return ndJsonStream
.pipe(createSplitStream('\n'))
.pipe(parseNdjsonStrings())
.pipe(filterExportedCounts())
.pipe(validateRules())
.pipe(createLimitStream(ruleLimit))
.pipe(createConcatStream([]));
export const createRulesStreamFromNdJson = (ruleLimit: number) => {
return [
createSplitStream('\n'),
parseNdjsonStrings(),
filterExportedCounts(),
validateRules(),
createLimitStream(ruleLimit),
createConcatStream([]),
];
};

View file

@ -31,7 +31,7 @@ export const readRules = async ({
return null;
}
} catch (err) {
if (err.output.statusCode === 404) {
if (err?.output?.statusCode === 404) {
return null;
} else {
// throw non-404 as they would be 500 or other internal errors

View file

@ -74,6 +74,17 @@ export default ({ getService }: FtrProviderContext): void => {
});
});
it('should report that it failed to import a thousand and one (10001) simple rules', async () => {
const { body } = await supertest
.post(`${DETECTION_ENGINE_RULES_URL}/_import`)
.set('kbn-xsrf', 'true')
.attach('file', getSimpleRuleAsNdjson(new Array(10001).fill('rule-1')), 'rules.ndjson')
.query()
.expect(500);
expect(body).to.eql({ message: "Can't import more than 10000 rules", status_code: 500 });
});
it('should be able to read an imported rule back out correctly', async () => {
await supertest
.post(`${DETECTION_ENGINE_RULES_URL}/_import`)