[Attack discovery] Optimize attack discovery test data (#206885)

## Summary

Followup for https://github.com/elastic/kibana/pull/182918. 
Compressed content and switched to load `.ndjson.gz`directly
This commit is contained in:
Patryk Kopyciński 2025-01-24 19:15:07 +01:00 committed by GitHub
parent 9a3fc89629
commit 67bedde849
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 38 additions and 107928 deletions

View file

@ -10,6 +10,11 @@ import path from 'path';
import type { Client } from '@elastic/elasticsearch';
import type { ToolingLog } from '@kbn/tooling-log';
import type { KbnClient } from '@kbn/test';
import { createGunzip } from 'zlib';
import { pipeline } from 'stream';
import { promisify } from 'util';
const pipelineAsync = promisify(pipeline);
const PIPELINE_NAME = 'insights_pipeline';
const DIRECTORY_PATH = path.resolve(
@ -63,6 +68,36 @@ const getRule = async ({ kbnClient, log }: { kbnClient: KbnClient; log: ToolingL
return response.data.data?.[0];
};
async function readAndDecompress({ filePath, log }: { filePath: string; log: ToolingLog }) {
try {
const decompressedChunks: Uint8Array[] = [];
// Create a read stream for the gzipped file
const fileStream = fs.createReadStream(filePath);
// Decompress the file stream using zlib
await pipelineAsync(
fileStream, // Readable stream for the file
createGunzip(), // Decompression stream
async function* (source) {
// Collect decompressed chunks
for await (const chunk of source) {
decompressedChunks.push(chunk);
}
}
);
// Combine decompressed chunks into a single buffer and convert to string
const decompressedBuffer = Buffer.concat(decompressedChunks);
const decompressedText = decompressedBuffer.toString('utf-8');
return decompressedText;
} catch (error) {
log.error('Error during file reading or decompression:');
log.error(error);
}
}
const importRule = async ({ kbnClient, log }: { kbnClient: KbnClient; log: ToolingLog }) => {
log.info('Importing rule from endpoint_alert.ndjson...');
@ -201,7 +236,7 @@ const processFile = async ({
log.info(`Processing and indexing file: ${file} ...`);
const fileData = await fs.readFileSync(file).toString().split('\n');
const fileData = (await readAndDecompress({ filePath: file, log }))?.split('\n') ?? [];
try {
const response = await esClient.bulk<string>({
@ -237,10 +272,10 @@ const processFilesForEpisode = async ({
}) => {
const dataFiles = fs
.readdirSync(DIRECTORY_PATH)
.filter((file) => file.includes(`ep${epNum}data.ndjson`));
.filter((file) => file.includes(`ep${epNum}data.ndjson.gz`));
const alertFiles = fs
.readdirSync(DIRECTORY_PATH)
.filter((file) => file.includes(`ep${epNum}alerts.ndjson`));
.filter((file) => file.includes(`ep${epNum}alerts.ndjson.gz`));
for (const file of dataFiles) {
await processFile({ esClient, file: path.join(DIRECTORY_PATH, file), indexType: 'data', log });