mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[Reporting] ReportingStore module (#69426)
* Add store class * fix tests * fix the createIndex bug * add reportingstore test * change function args * nits * add test for automatic index creation failure recovery
This commit is contained in:
parent
83da3a8b4d
commit
e143905223
21 changed files with 665 additions and 841 deletions
|
@ -24,6 +24,7 @@ import { screenshotsObservableFactory } from './export_types/common/lib/screensh
|
|||
import { checkLicense, getExportTypesRegistry } from './lib';
|
||||
import { ESQueueInstance } from './lib/create_queue';
|
||||
import { EnqueueJobFn } from './lib/enqueue_job';
|
||||
import { ReportingStore } from './lib/store';
|
||||
|
||||
export interface ReportingInternalSetup {
|
||||
elasticsearch: ElasticsearchServiceSetup;
|
||||
|
@ -37,6 +38,7 @@ export interface ReportingInternalStart {
|
|||
browserDriverFactory: HeadlessChromiumDriverFactory;
|
||||
enqueueJob: EnqueueJobFn;
|
||||
esqueue: ESQueueInstance;
|
||||
store: ReportingStore;
|
||||
savedObjects: SavedObjectsServiceStart;
|
||||
uiSettings: UiSettingsServiceStart;
|
||||
}
|
||||
|
|
|
@ -8,17 +8,16 @@ import { ReportingCore } from '../core';
|
|||
import { JobSource, TaskRunResult } from '../types';
|
||||
import { createTaggedLogger } from './create_tagged_logger'; // TODO remove createTaggedLogger once esqueue is removed
|
||||
import { createWorkerFactory } from './create_worker';
|
||||
import { Job } from './enqueue_job';
|
||||
// @ts-ignore
|
||||
import { Esqueue } from './esqueue';
|
||||
import { LevelLogger } from './level_logger';
|
||||
import { ReportingStore } from './store';
|
||||
|
||||
interface ESQueueWorker {
|
||||
on: (event: string, handler: any) => void;
|
||||
}
|
||||
|
||||
export interface ESQueueInstance {
|
||||
addJob: (type: string, payload: unknown, options: object) => Job;
|
||||
registerWorker: <JobParamsType>(
|
||||
pluginId: string,
|
||||
workerFn: GenericWorkerFn<JobParamsType>,
|
||||
|
@ -37,26 +36,25 @@ type GenericWorkerFn<JobParamsType> = (
|
|||
...workerRestArgs: any[]
|
||||
) => void | Promise<TaskRunResult>;
|
||||
|
||||
export async function createQueueFactory<JobParamsType, JobPayloadType>(
|
||||
export async function createQueueFactory(
|
||||
reporting: ReportingCore,
|
||||
store: ReportingStore,
|
||||
logger: LevelLogger
|
||||
): Promise<ESQueueInstance> {
|
||||
const config = reporting.getConfig();
|
||||
const queueIndexInterval = config.get('queue', 'indexInterval');
|
||||
|
||||
// esqueue-related
|
||||
const queueTimeout = config.get('queue', 'timeout');
|
||||
const queueIndex = config.get('index');
|
||||
const isPollingEnabled = config.get('queue', 'pollEnabled');
|
||||
|
||||
const elasticsearch = await reporting.getElasticsearchService();
|
||||
const elasticsearch = reporting.getElasticsearchService();
|
||||
const queueOptions = {
|
||||
interval: queueIndexInterval,
|
||||
timeout: queueTimeout,
|
||||
dateSeparator: '.',
|
||||
client: elasticsearch.legacy.client,
|
||||
logger: createTaggedLogger(logger, ['esqueue', 'queue-worker']),
|
||||
};
|
||||
|
||||
const queue: ESQueueInstance = new Esqueue(queueIndex, queueOptions);
|
||||
const queue: ESQueueInstance = new Esqueue(store, queueOptions);
|
||||
|
||||
if (isPollingEnabled) {
|
||||
// create workers to poll the index for idle jobs waiting to be claimed and executed
|
||||
|
|
|
@ -4,39 +4,24 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
|
||||
import { AuthenticatedUser } from '../../../security/server';
|
||||
import { ESQueueCreateJobFn } from '../../server/types';
|
||||
import { ReportingCore } from '../core';
|
||||
// @ts-ignore
|
||||
import { events as esqueueEvents } from './esqueue';
|
||||
import { LevelLogger } from './level_logger';
|
||||
import { LevelLogger } from './';
|
||||
import { ReportingStore, Report } from './store';
|
||||
|
||||
interface ConfirmedJob {
|
||||
id: string;
|
||||
index: string;
|
||||
_seq_no: number;
|
||||
_primary_term: number;
|
||||
}
|
||||
|
||||
export type Job = EventEmitter & {
|
||||
id: string;
|
||||
toJSON: () => {
|
||||
id: string;
|
||||
};
|
||||
};
|
||||
|
||||
export type EnqueueJobFn = <JobParamsType>(
|
||||
export type EnqueueJobFn = (
|
||||
exportTypeId: string,
|
||||
jobParams: JobParamsType,
|
||||
jobParams: unknown,
|
||||
user: AuthenticatedUser | null,
|
||||
context: RequestHandlerContext,
|
||||
request: KibanaRequest
|
||||
) => Promise<Job>;
|
||||
) => Promise<Report>;
|
||||
|
||||
export function enqueueJobFactory(
|
||||
reporting: ReportingCore,
|
||||
store: ReportingStore,
|
||||
parentLogger: LevelLogger
|
||||
): EnqueueJobFn {
|
||||
const config = reporting.getConfig();
|
||||
|
@ -45,16 +30,16 @@ export function enqueueJobFactory(
|
|||
const maxAttempts = config.get('capture', 'maxAttempts');
|
||||
const logger = parentLogger.clone(['queue-job']);
|
||||
|
||||
return async function enqueueJob<JobParamsType>(
|
||||
return async function enqueueJob(
|
||||
exportTypeId: string,
|
||||
jobParams: JobParamsType,
|
||||
jobParams: unknown,
|
||||
user: AuthenticatedUser | null,
|
||||
context: RequestHandlerContext,
|
||||
request: KibanaRequest
|
||||
): Promise<Job> {
|
||||
type ScheduleTaskFnType = ESQueueCreateJobFn<JobParamsType>;
|
||||
) {
|
||||
type ScheduleTaskFnType = ESQueueCreateJobFn<unknown>;
|
||||
|
||||
const username = user ? user.username : false;
|
||||
const esqueue = await reporting.getEsqueue();
|
||||
const exportType = reporting.getExportTypesRegistry().getById(exportTypeId);
|
||||
|
||||
if (exportType == null) {
|
||||
|
@ -71,16 +56,6 @@ export function enqueueJobFactory(
|
|||
max_attempts: maxAttempts,
|
||||
};
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const job = esqueue.addJob(exportType.jobType, payload, options);
|
||||
|
||||
job.on(esqueueEvents.EVENT_JOB_CREATED, (createdJob: ConfirmedJob) => {
|
||||
if (createdJob.id === job.id) {
|
||||
logger.info(`Successfully queued job: ${createdJob.id}`);
|
||||
resolve(job);
|
||||
}
|
||||
});
|
||||
job.on(esqueueEvents.EVENT_JOB_CREATE_ERROR, reject);
|
||||
});
|
||||
return await store.addReport(exportType.jobType, payload, options);
|
||||
};
|
||||
}
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import sinon from 'sinon';
|
||||
import { createIndex } from '../../helpers/create_index';
|
||||
import { ClientMock } from '../fixtures/legacy_elasticsearch';
|
||||
import { constants } from '../../constants';
|
||||
|
||||
describe('Create Index', function () {
|
||||
describe('Does not exist', function () {
|
||||
let client;
|
||||
let createSpy;
|
||||
|
||||
beforeEach(function () {
|
||||
client = new ClientMock();
|
||||
createSpy = sinon.spy(client, 'callAsInternalUser').withArgs('indices.create');
|
||||
});
|
||||
|
||||
it('should return true', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result.then((exists) => expect(exists).to.be(true));
|
||||
});
|
||||
|
||||
it('should create the index with mappings and default settings', function () {
|
||||
const indexName = 'test-index';
|
||||
const settings = constants.DEFAULT_SETTING_INDEX_SETTINGS;
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result.then(function () {
|
||||
const payload = createSpy.getCall(0).args[1];
|
||||
sinon.assert.callCount(createSpy, 1);
|
||||
expect(payload).to.have.property('index', indexName);
|
||||
expect(payload).to.have.property('body');
|
||||
expect(payload.body).to.have.property('settings');
|
||||
expect(payload.body.settings).to.eql(settings);
|
||||
expect(payload.body).to.have.property('mappings');
|
||||
expect(payload.body.mappings).to.have.property('properties');
|
||||
});
|
||||
});
|
||||
|
||||
it('should create the index with custom settings', function () {
|
||||
const indexName = 'test-index';
|
||||
const settings = {
|
||||
...constants.DEFAULT_SETTING_INDEX_SETTINGS,
|
||||
auto_expand_replicas: false,
|
||||
number_of_shards: 3000,
|
||||
number_of_replicas: 1,
|
||||
format: '3000',
|
||||
};
|
||||
const result = createIndex(client, indexName, settings);
|
||||
|
||||
return result.then(function () {
|
||||
const payload = createSpy.getCall(0).args[1];
|
||||
sinon.assert.callCount(createSpy, 1);
|
||||
expect(payload).to.have.property('index', indexName);
|
||||
expect(payload).to.have.property('body');
|
||||
expect(payload.body).to.have.property('settings');
|
||||
expect(payload.body.settings).to.eql(settings);
|
||||
expect(payload.body).to.have.property('mappings');
|
||||
expect(payload.body.mappings).to.have.property('properties');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Does exist', function () {
|
||||
let client;
|
||||
let createSpy;
|
||||
|
||||
beforeEach(function () {
|
||||
client = new ClientMock();
|
||||
sinon
|
||||
.stub(client, 'callAsInternalUser')
|
||||
.withArgs('indices.exists')
|
||||
.callsFake(() => Promise.resolve(true));
|
||||
createSpy = client.callAsInternalUser.withArgs('indices.create');
|
||||
});
|
||||
|
||||
it('should return true', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result.then((exists) => expect(exists).to.be(true));
|
||||
});
|
||||
|
||||
it('should not create the index', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result.then(function () {
|
||||
sinon.assert.callCount(createSpy, 0);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,93 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import sinon from 'sinon';
|
||||
import moment from 'moment';
|
||||
import { constants } from '../../constants';
|
||||
import { indexTimestamp } from '../../helpers/index_timestamp';
|
||||
|
||||
const anchor = '2016-04-02T01:02:03.456'; // saturday
|
||||
|
||||
describe('Index timestamp interval', function () {
|
||||
describe('construction', function () {
|
||||
it('should throw given an invalid interval', function () {
|
||||
const init = () => indexTimestamp('bananas');
|
||||
expect(init).to.throwException(/invalid.+interval/i);
|
||||
});
|
||||
});
|
||||
|
||||
describe('timestamps', function () {
|
||||
let clock;
|
||||
let separator;
|
||||
|
||||
beforeEach(function () {
|
||||
separator = constants.DEFAULT_SETTING_DATE_SEPARATOR;
|
||||
clock = sinon.useFakeTimers(moment(anchor).valueOf());
|
||||
});
|
||||
|
||||
afterEach(function () {
|
||||
clock.restore();
|
||||
});
|
||||
|
||||
describe('formats', function () {
|
||||
it('should return the year', function () {
|
||||
const timestamp = indexTimestamp('year');
|
||||
const str = `2016`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year and month', function () {
|
||||
const timestamp = indexTimestamp('month');
|
||||
const str = `2016${separator}04`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, and first day of the week', function () {
|
||||
const timestamp = indexTimestamp('week');
|
||||
const str = `2016${separator}03${separator}27`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, and day of the week', function () {
|
||||
const timestamp = indexTimestamp('day');
|
||||
const str = `2016${separator}04${separator}02`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, day and hour', function () {
|
||||
const timestamp = indexTimestamp('hour');
|
||||
const str = `2016${separator}04${separator}02${separator}01`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, day, hour and minute', function () {
|
||||
const timestamp = indexTimestamp('minute');
|
||||
const str = `2016${separator}04${separator}02${separator}01${separator}02`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
});
|
||||
|
||||
describe('date separator', function () {
|
||||
it('should be customizable', function () {
|
||||
const separators = ['-', '.', '_'];
|
||||
separators.forEach((customSep) => {
|
||||
const str = `2016${customSep}04${customSep}02${customSep}01${customSep}02`;
|
||||
const timestamp = indexTimestamp('minute', customSep);
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw if a letter is used', function () {
|
||||
const separators = ['a', 'B', 'YYYY'];
|
||||
separators.forEach((customSep) => {
|
||||
const fn = () => indexTimestamp('minute', customSep);
|
||||
expect(fn).to.throwException();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,420 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import events from 'events';
|
||||
import expect from '@kbn/expect';
|
||||
import sinon from 'sinon';
|
||||
import proxyquire from 'proxyquire';
|
||||
import { QueueMock } from './fixtures/queue';
|
||||
import { ClientMock } from './fixtures/legacy_elasticsearch';
|
||||
import { constants } from '../constants';
|
||||
|
||||
const createIndexMock = sinon.stub();
|
||||
const { Job } = proxyquire.noPreserveCache()('../job', {
|
||||
'./helpers/create_index': { createIndex: createIndexMock },
|
||||
});
|
||||
|
||||
const maxPriority = 20;
|
||||
const minPriority = -20;
|
||||
const defaultPriority = 10;
|
||||
const defaultCreatedBy = false;
|
||||
|
||||
function validateDoc(spy) {
|
||||
sinon.assert.callCount(spy, 1);
|
||||
const spyCall = spy.getCall(0);
|
||||
return spyCall.args[1];
|
||||
}
|
||||
|
||||
describe('Job Class', function () {
|
||||
let mockQueue;
|
||||
let client;
|
||||
let index;
|
||||
|
||||
let type;
|
||||
let payload;
|
||||
let options;
|
||||
|
||||
beforeEach(function () {
|
||||
createIndexMock.resetHistory();
|
||||
createIndexMock.returns(Promise.resolve('mock'));
|
||||
index = 'test';
|
||||
|
||||
client = new ClientMock();
|
||||
mockQueue = new QueueMock();
|
||||
mockQueue.setClient(client);
|
||||
});
|
||||
|
||||
it('should be an event emitter', function () {
|
||||
const job = new Job(mockQueue, index, 'test', {});
|
||||
expect(job).to.be.an(events.EventEmitter);
|
||||
});
|
||||
|
||||
describe('invalid construction', function () {
|
||||
it('should throw with a missing type', function () {
|
||||
const init = () => new Job(mockQueue, index);
|
||||
expect(init).to.throwException(/type.+string/i);
|
||||
});
|
||||
|
||||
it('should throw with an invalid type', function () {
|
||||
const init = () => new Job(mockQueue, index, { 'not a string': true });
|
||||
expect(init).to.throwException(/type.+string/i);
|
||||
});
|
||||
|
||||
it('should throw with an invalid payload', function () {
|
||||
const init = () => new Job(mockQueue, index, 'type1', [1, 2, 3]);
|
||||
expect(init).to.throwException(/plain.+object/i);
|
||||
});
|
||||
|
||||
it(`should throw error if invalid maxAttempts`, function () {
|
||||
const init = () => new Job(mockQueue, index, 'type1', { id: '123' }, { max_attempts: -1 });
|
||||
expect(init).to.throwException(/invalid.+max_attempts/i);
|
||||
});
|
||||
});
|
||||
|
||||
describe('construction', function () {
|
||||
let indexSpy;
|
||||
beforeEach(function () {
|
||||
type = 'type1';
|
||||
payload = { id: '123' };
|
||||
indexSpy = sinon.spy(client, 'callAsInternalUser').withArgs('index');
|
||||
});
|
||||
|
||||
it('should create the target index', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(createIndexMock);
|
||||
const args = createIndexMock.getCall(0).args;
|
||||
expect(args[0]).to.equal(client);
|
||||
expect(args[1]).to.equal(index);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the payload', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs).to.have.property('index', index);
|
||||
expect(indexArgs).to.have.property('body');
|
||||
expect(indexArgs.body).to.have.property('payload', payload);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the job type', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs).to.have.property('index', index);
|
||||
expect(indexArgs).to.have.property('body');
|
||||
expect(indexArgs.body).to.have.property('jobtype', type);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set event creation time', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('created_at');
|
||||
});
|
||||
});
|
||||
|
||||
it('should refresh the index', function () {
|
||||
const refreshSpy = client.callAsInternalUser.withArgs('indices.refresh');
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(refreshSpy);
|
||||
const spyCall = refreshSpy.getCall(0);
|
||||
expect(spyCall.args[1]).to.have.property('index', index);
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit the job information on success', function (done) {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
job.once(constants.EVENT_JOB_CREATED, (jobDoc) => {
|
||||
try {
|
||||
expect(jobDoc).to.have.property('id');
|
||||
expect(jobDoc).to.have.property('index');
|
||||
expect(jobDoc).to.have.property('_seq_no');
|
||||
expect(jobDoc).to.have.property('_primary_term');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit error on index creation failure', function (done) {
|
||||
const errMsg = 'test index creation failure';
|
||||
|
||||
createIndexMock.returns(Promise.reject(new Error(errMsg)));
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
|
||||
job.once(constants.EVENT_JOB_CREATE_ERROR, (err) => {
|
||||
try {
|
||||
expect(err.message).to.equal(errMsg);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit error on client index failure', function (done) {
|
||||
const errMsg = 'test document index failure';
|
||||
|
||||
client.callAsInternalUser.restore();
|
||||
sinon
|
||||
.stub(client, 'callAsInternalUser')
|
||||
.withArgs('index')
|
||||
.callsFake(() => Promise.reject(new Error(errMsg)));
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
|
||||
job.once(constants.EVENT_JOB_CREATE_ERROR, (err) => {
|
||||
try {
|
||||
expect(err.message).to.equal(errMsg);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('event emitting', function () {
|
||||
it('should trigger events on the queue instance', function (done) {
|
||||
const eventName = 'test event';
|
||||
const payload1 = {
|
||||
test: true,
|
||||
deep: { object: 'ok' },
|
||||
};
|
||||
const payload2 = 'two';
|
||||
const payload3 = new Error('test error');
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
|
||||
mockQueue.on(eventName, (...args) => {
|
||||
try {
|
||||
expect(args[0]).to.equal(payload1);
|
||||
expect(args[1]).to.equal(payload2);
|
||||
expect(args[2]).to.equal(payload3);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
job.emit(eventName, payload1, payload2, payload3);
|
||||
});
|
||||
});
|
||||
|
||||
describe('default values', function () {
|
||||
let indexSpy;
|
||||
beforeEach(function () {
|
||||
type = 'type1';
|
||||
payload = { id: '123' };
|
||||
indexSpy = sinon.spy(client, 'callAsInternalUser').withArgs('index');
|
||||
});
|
||||
|
||||
it('should set attempt count to 0', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('attempts', 0);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index default created_by value', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('created_by', defaultCreatedBy);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set an expired process_expiration time', function () {
|
||||
const now = new Date().getTime();
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('process_expiration');
|
||||
expect(indexArgs.body.process_expiration.getTime()).to.be.lessThan(now);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set status as pending', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('status', constants.JOB_STATUS_PENDING);
|
||||
});
|
||||
});
|
||||
|
||||
it('should have a default priority of 10', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('priority', defaultPriority);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set a browser type', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('browser_type');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('option passing', function () {
|
||||
let indexSpy;
|
||||
beforeEach(function () {
|
||||
type = 'type1';
|
||||
payload = { id: '123' };
|
||||
options = {
|
||||
timeout: 4567,
|
||||
max_attempts: 9,
|
||||
headers: {
|
||||
authorization: 'Basic cXdlcnR5',
|
||||
},
|
||||
};
|
||||
indexSpy = sinon.spy(client, 'callAsInternalUser').withArgs('index');
|
||||
});
|
||||
|
||||
it('should index the created_by value', function () {
|
||||
const createdBy = 'user_identifier';
|
||||
const job = new Job(mockQueue, index, type, payload, {
|
||||
created_by: createdBy,
|
||||
...options,
|
||||
});
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('created_by', createdBy);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index timeout value from options', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('timeout', options.timeout);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set max attempt count', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('max_attempts', options.max_attempts);
|
||||
});
|
||||
});
|
||||
|
||||
it('should add headers to the request params', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs).to.have.property('headers', options.headers);
|
||||
});
|
||||
});
|
||||
|
||||
it(`should use upper priority of ${maxPriority}`, function () {
|
||||
const job = new Job(mockQueue, index, type, payload, { priority: maxPriority * 2 });
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('priority', maxPriority);
|
||||
});
|
||||
});
|
||||
|
||||
it(`should use lower priority of ${minPriority}`, function () {
|
||||
const job = new Job(mockQueue, index, type, payload, { priority: minPriority * 2 });
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(indexSpy);
|
||||
expect(indexArgs.body).to.have.property('priority', minPriority);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('get method', function () {
|
||||
beforeEach(function () {
|
||||
type = 'type2';
|
||||
payload = { id: '123' };
|
||||
});
|
||||
|
||||
it('should return the job document', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
|
||||
return job.get().then((doc) => {
|
||||
const jobDoc = job.document; // document should be resolved
|
||||
expect(doc).to.have.property('index', index);
|
||||
expect(doc).to.have.property('id', jobDoc.id);
|
||||
expect(doc).to.have.property('_seq_no', jobDoc._seq_no);
|
||||
expect(doc).to.have.property('_primary_term', jobDoc._primary_term);
|
||||
expect(doc).to.have.property('created_by', defaultCreatedBy);
|
||||
|
||||
expect(doc).to.have.property('payload');
|
||||
expect(doc).to.have.property('jobtype');
|
||||
expect(doc).to.have.property('priority');
|
||||
expect(doc).to.have.property('timeout');
|
||||
});
|
||||
});
|
||||
|
||||
it('should contain optional data', function () {
|
||||
const optionals = {
|
||||
created_by: 'some_ident',
|
||||
};
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload, optionals);
|
||||
return Promise.resolve(client.callAsInternalUser('get', {}, optionals))
|
||||
.then((doc) => {
|
||||
sinon.stub(client, 'callAsInternalUser').withArgs('get').returns(Promise.resolve(doc));
|
||||
})
|
||||
.then(() => {
|
||||
return job.get().then((doc) => {
|
||||
expect(doc).to.have.property('created_by', optionals.created_by);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('toJSON method', function () {
|
||||
beforeEach(function () {
|
||||
type = 'type2';
|
||||
payload = { id: '123' };
|
||||
options = {
|
||||
timeout: 4567,
|
||||
max_attempts: 9,
|
||||
priority: 8,
|
||||
};
|
||||
});
|
||||
|
||||
it('should return the static information about the job', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
|
||||
// toJSON is sync, should work before doc is written to elasticsearch
|
||||
expect(job.document).to.be(undefined);
|
||||
|
||||
const doc = job.toJSON();
|
||||
expect(doc).to.have.property('index', index);
|
||||
expect(doc).to.have.property('jobtype', type);
|
||||
expect(doc).to.have.property('created_by', defaultCreatedBy);
|
||||
expect(doc).to.have.property('timeout', options.timeout);
|
||||
expect(doc).to.have.property('max_attempts', options.max_attempts);
|
||||
expect(doc).to.have.property('priority', options.priority);
|
||||
expect(doc).to.have.property('id');
|
||||
expect(doc).to.not.have.property('version');
|
||||
});
|
||||
|
||||
it('should contain optional data', function () {
|
||||
const optionals = {
|
||||
created_by: 'some_ident',
|
||||
};
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload, optionals);
|
||||
const doc = job.toJSON();
|
||||
expect(doc).to.have.property('created_by', optionals.created_by);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -5,20 +5,17 @@
|
|||
*/
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import { Job } from './job';
|
||||
import { Worker } from './worker';
|
||||
import { constants } from './constants';
|
||||
import { indexTimestamp } from './helpers/index_timestamp';
|
||||
import { omit } from 'lodash';
|
||||
|
||||
export { events } from './constants/events';
|
||||
|
||||
export class Esqueue extends EventEmitter {
|
||||
constructor(index, options = {}) {
|
||||
if (!index) throw new Error('Must specify an index to write to');
|
||||
|
||||
constructor(store, options = {}) {
|
||||
super();
|
||||
this.index = index;
|
||||
this.store = store; // for updating jobs in ES
|
||||
this.index = this.store.indexPrefix; // for polling for pending jobs
|
||||
this.settings = {
|
||||
interval: constants.DEFAULT_SETTING_INTERVAL,
|
||||
timeout: constants.DEFAULT_SETTING_TIMEOUT,
|
||||
|
@ -40,21 +37,6 @@ export class Esqueue extends EventEmitter {
|
|||
});
|
||||
}
|
||||
|
||||
addJob(jobtype, payload, opts = {}) {
|
||||
const timestamp = indexTimestamp(this.settings.interval, this.settings.dateSeparator);
|
||||
const index = `${this.index}-${timestamp}`;
|
||||
const defaults = {
|
||||
timeout: this.settings.timeout,
|
||||
};
|
||||
|
||||
const options = Object.assign(defaults, opts, {
|
||||
indexSettings: this.settings.indexSettings,
|
||||
logger: this._logger,
|
||||
});
|
||||
|
||||
return new Job(this, index, jobtype, payload, options);
|
||||
}
|
||||
|
||||
registerWorker(type, workerFn, opts) {
|
||||
const worker = new Worker(this, type, workerFn, { ...opts, logger: this._logger });
|
||||
this._workers.push(worker);
|
||||
|
|
|
@ -1,142 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import events from 'events';
|
||||
import Puid from 'puid';
|
||||
import { constants } from './constants';
|
||||
import { createIndex } from './helpers/create_index';
|
||||
import { isPlainObject } from 'lodash';
|
||||
|
||||
const puid = new Puid();
|
||||
|
||||
export class Job extends events.EventEmitter {
|
||||
constructor(queue, index, jobtype, payload, options = {}) {
|
||||
if (typeof jobtype !== 'string') throw new Error('Jobtype must be a string');
|
||||
if (!isPlainObject(payload)) throw new Error('Payload must be a plain object');
|
||||
|
||||
super();
|
||||
|
||||
this.queue = queue;
|
||||
this._client = this.queue.client;
|
||||
this.id = puid.generate();
|
||||
this.index = index;
|
||||
this.jobtype = jobtype;
|
||||
this.payload = payload;
|
||||
this.created_by = options.created_by || false;
|
||||
this.timeout = options.timeout || 10000;
|
||||
this.maxAttempts = options.max_attempts || 3;
|
||||
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
|
||||
this.indexSettings = options.indexSettings || {};
|
||||
this.browser_type = options.browser_type;
|
||||
|
||||
if (typeof this.maxAttempts !== 'number' || this.maxAttempts < 1) {
|
||||
throw new Error(`Invalid max_attempts: ${this.maxAttempts}`);
|
||||
}
|
||||
|
||||
this.debug = (msg, err) => {
|
||||
const logger = options.logger || function () {};
|
||||
const message = `${this.id} - ${msg}`;
|
||||
const tags = ['debug'];
|
||||
|
||||
if (err) {
|
||||
logger(`${message}: ${err}`, tags);
|
||||
return;
|
||||
}
|
||||
|
||||
logger(message, tags);
|
||||
};
|
||||
|
||||
const indexParams = {
|
||||
index: this.index,
|
||||
id: this.id,
|
||||
body: {
|
||||
jobtype: this.jobtype,
|
||||
meta: {
|
||||
// We are copying these values out of payload because these fields are indexed and can be aggregated on
|
||||
// for tracking stats, while payload contents are not.
|
||||
objectType: payload.objectType,
|
||||
layout: payload.layout ? payload.layout.id : 'none',
|
||||
},
|
||||
payload: this.payload,
|
||||
priority: this.priority,
|
||||
created_by: this.created_by,
|
||||
timeout: this.timeout,
|
||||
process_expiration: new Date(0), // use epoch so the job query works
|
||||
created_at: new Date(),
|
||||
attempts: 0,
|
||||
max_attempts: this.maxAttempts,
|
||||
status: constants.JOB_STATUS_PENDING,
|
||||
browser_type: this.browser_type,
|
||||
},
|
||||
};
|
||||
|
||||
if (options.headers) {
|
||||
indexParams.headers = options.headers;
|
||||
}
|
||||
|
||||
this.ready = createIndex(this._client, this.index, this.indexSettings)
|
||||
.then(() => this._client.callAsInternalUser('index', indexParams))
|
||||
.then((doc) => {
|
||||
this.document = {
|
||||
id: doc._id,
|
||||
index: doc._index,
|
||||
_seq_no: doc._seq_no,
|
||||
_primary_term: doc._primary_term,
|
||||
};
|
||||
this.debug(`Job created in index ${this.index}`);
|
||||
|
||||
return this._client
|
||||
.callAsInternalUser('indices.refresh', {
|
||||
index: this.index,
|
||||
})
|
||||
.then(() => {
|
||||
this.debug(`Job index refreshed ${this.index}`);
|
||||
this.emit(constants.EVENT_JOB_CREATED, this.document);
|
||||
});
|
||||
})
|
||||
.catch((err) => {
|
||||
this.debug('Job creation failed', err);
|
||||
this.emit(constants.EVENT_JOB_CREATE_ERROR, err);
|
||||
});
|
||||
}
|
||||
|
||||
emit(name, ...args) {
|
||||
super.emit(name, ...args);
|
||||
this.queue.emit(name, ...args);
|
||||
}
|
||||
|
||||
get() {
|
||||
return this.ready
|
||||
.then(() => {
|
||||
return this._client.callAsInternalUser('get', {
|
||||
index: this.index,
|
||||
id: this.id,
|
||||
});
|
||||
})
|
||||
.then((doc) => {
|
||||
return Object.assign(doc._source, {
|
||||
index: doc._index,
|
||||
id: doc._id,
|
||||
_seq_no: doc._seq_no,
|
||||
_primary_term: doc._primary_term,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
toJSON() {
|
||||
return {
|
||||
id: this.id,
|
||||
index: this.index,
|
||||
jobtype: this.jobtype,
|
||||
created_by: this.created_by,
|
||||
payload: this.payload,
|
||||
timeout: this.timeout,
|
||||
max_attempts: this.maxAttempts,
|
||||
priority: this.priority,
|
||||
browser_type: this.browser_type,
|
||||
};
|
||||
}
|
||||
}
|
|
@ -158,8 +158,8 @@ export class Worker extends events.EventEmitter {
|
|||
kibana_name: this.kibanaName,
|
||||
};
|
||||
|
||||
return this._client
|
||||
.callAsInternalUser('update', {
|
||||
return this.queue.store
|
||||
.updateReport({
|
||||
index: job._index,
|
||||
id: job._id,
|
||||
if_seq_no: job._seq_no,
|
||||
|
@ -197,8 +197,8 @@ export class Worker extends events.EventEmitter {
|
|||
output: docOutput,
|
||||
});
|
||||
|
||||
return this._client
|
||||
.callAsInternalUser('update', {
|
||||
return this.queue.store
|
||||
.updateReport({
|
||||
index: job._index,
|
||||
id: job._id,
|
||||
if_seq_no: job._seq_no,
|
||||
|
@ -294,8 +294,8 @@ export class Worker extends events.EventEmitter {
|
|||
output: docOutput,
|
||||
};
|
||||
|
||||
return this._client
|
||||
.callAsInternalUser('update', {
|
||||
return this.queue.store
|
||||
.updateReport({
|
||||
index: job._index,
|
||||
id: job._id,
|
||||
if_seq_no: job._seq_no,
|
||||
|
|
|
@ -12,3 +12,4 @@ export { enqueueJobFactory } from './enqueue_job';
|
|||
export { getExportTypesRegistry } from './export_types_registry';
|
||||
export { runValidations } from './validate';
|
||||
export { startTrace } from './trace';
|
||||
export { ReportingStore } from './store';
|
||||
|
|
8
x-pack/plugins/reporting/server/lib/store/index.ts
Normal file
8
x-pack/plugins/reporting/server/lib/store/index.ts
Normal file
|
@ -0,0 +1,8 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
export { Report } from './report';
|
||||
export { ReportingStore } from './store';
|
|
@ -4,19 +4,20 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import moment from 'moment';
|
||||
import moment, { unitOfTime } from 'moment';
|
||||
|
||||
export const intervals = ['year', 'month', 'week', 'day', 'hour', 'minute'];
|
||||
|
||||
// TODO: This helper function can be removed by using `schema.duration` objects in the reporting config schema
|
||||
export function indexTimestamp(intervalStr, separator = '-') {
|
||||
export function indexTimestamp(intervalStr: string, separator = '-') {
|
||||
const startOf = intervalStr as unitOfTime.StartOf;
|
||||
if (separator.match(/[a-z]/i)) throw new Error('Interval separator can not be a letter');
|
||||
|
||||
const index = intervals.indexOf(intervalStr);
|
||||
if (index === -1) throw new Error('Invalid index interval: ', intervalStr);
|
||||
if (index === -1) throw new Error('Invalid index interval: ' + intervalStr);
|
||||
|
||||
const m = moment();
|
||||
m.startOf(intervalStr);
|
||||
m.startOf(startOf);
|
||||
|
||||
let dateString;
|
||||
switch (intervalStr) {
|
65
x-pack/plugins/reporting/server/lib/store/mapping.ts
Normal file
65
x-pack/plugins/reporting/server/lib/store/mapping.ts
Normal file
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
export const mapping = {
|
||||
meta: {
|
||||
// We are indexing these properties with both text and keyword fields because that's what will be auto generated
|
||||
// when an index already exists. This schema is only used when a reporting index doesn't exist. This way existing
|
||||
// reporting indexes and new reporting indexes will look the same and the data can be queried in the same
|
||||
// manner.
|
||||
properties: {
|
||||
/**
|
||||
* Type of object that is triggering this report. Should be either search, visualization or dashboard.
|
||||
* Used for job listing and telemetry stats only.
|
||||
*/
|
||||
objectType: {
|
||||
type: 'text',
|
||||
fields: {
|
||||
keyword: {
|
||||
type: 'keyword',
|
||||
ignore_above: 256,
|
||||
},
|
||||
},
|
||||
},
|
||||
/**
|
||||
* Can be either preserve_layout, print or none (in the case of csv export).
|
||||
* Used for phone home stats only.
|
||||
*/
|
||||
layout: {
|
||||
type: 'text',
|
||||
fields: {
|
||||
keyword: {
|
||||
type: 'keyword',
|
||||
ignore_above: 256,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
browser_type: { type: 'keyword' },
|
||||
jobtype: { type: 'keyword' },
|
||||
payload: { type: 'object', enabled: false },
|
||||
priority: { type: 'byte' },
|
||||
timeout: { type: 'long' },
|
||||
process_expiration: { type: 'date' },
|
||||
created_by: { type: 'keyword' },
|
||||
created_at: { type: 'date' },
|
||||
started_at: { type: 'date' },
|
||||
completed_at: { type: 'date' },
|
||||
attempts: { type: 'short' },
|
||||
max_attempts: { type: 'short' },
|
||||
kibana_name: { type: 'keyword' },
|
||||
kibana_id: { type: 'keyword' },
|
||||
status: { type: 'keyword' },
|
||||
output: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
content_type: { type: 'keyword' },
|
||||
size: { type: 'long' },
|
||||
content: { type: 'object', enabled: false },
|
||||
},
|
||||
},
|
||||
};
|
77
x-pack/plugins/reporting/server/lib/store/report.test.ts
Normal file
77
x-pack/plugins/reporting/server/lib/store/report.test.ts
Normal file
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { Report } from './report';
|
||||
|
||||
describe('Class Report', () => {
|
||||
it('constructs Report instance', () => {
|
||||
const opts = {
|
||||
index: '.reporting-test-index-12345',
|
||||
jobtype: 'test-report',
|
||||
created_by: 'created_by_test_string',
|
||||
browser_type: 'browser_type_test_string',
|
||||
max_attempts: 50,
|
||||
payload: { payload_test_field: 1 },
|
||||
timeout: 30000,
|
||||
priority: 1,
|
||||
};
|
||||
const report = new Report(opts);
|
||||
expect(report.toJSON()).toMatchObject({
|
||||
_primary_term: undefined,
|
||||
_seq_no: undefined,
|
||||
browser_type: 'browser_type_test_string',
|
||||
created_by: 'created_by_test_string',
|
||||
jobtype: 'test-report',
|
||||
max_attempts: 50,
|
||||
payload: {
|
||||
payload_test_field: 1,
|
||||
},
|
||||
priority: 1,
|
||||
timeout: 30000,
|
||||
});
|
||||
|
||||
expect(report.id).toBeDefined();
|
||||
});
|
||||
|
||||
it('updateWithDoc method syncs takes fields to sync ES metadata', () => {
|
||||
const opts = {
|
||||
index: '.reporting-test-index-12345',
|
||||
jobtype: 'test-report',
|
||||
created_by: 'created_by_test_string',
|
||||
browser_type: 'browser_type_test_string',
|
||||
max_attempts: 50,
|
||||
payload: { payload_test_field: 1 },
|
||||
timeout: 30000,
|
||||
priority: 1,
|
||||
};
|
||||
const report = new Report(opts);
|
||||
|
||||
const metadata = {
|
||||
_index: '.reporting-test-update',
|
||||
_id: '12342p9o387549o2345',
|
||||
_primary_term: 77,
|
||||
_seq_no: 99,
|
||||
};
|
||||
report.updateWithDoc(metadata);
|
||||
|
||||
expect(report.toJSON()).toMatchObject({
|
||||
index: '.reporting-test-update',
|
||||
_primary_term: 77,
|
||||
_seq_no: 99,
|
||||
browser_type: 'browser_type_test_string',
|
||||
created_by: 'created_by_test_string',
|
||||
jobtype: 'test-report',
|
||||
max_attempts: 50,
|
||||
payload: {
|
||||
payload_test_field: 1,
|
||||
},
|
||||
priority: 1,
|
||||
timeout: 30000,
|
||||
});
|
||||
|
||||
expect(report._id).toBe('12342p9o387549o2345');
|
||||
});
|
||||
});
|
85
x-pack/plugins/reporting/server/lib/store/report.ts
Normal file
85
x-pack/plugins/reporting/server/lib/store/report.ts
Normal file
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
// @ts-ignore no module definition
|
||||
import Puid from 'puid';
|
||||
|
||||
interface Payload {
|
||||
id?: string;
|
||||
index: string;
|
||||
jobtype: string;
|
||||
created_by: string | boolean;
|
||||
payload: unknown;
|
||||
browser_type: string;
|
||||
priority: number;
|
||||
max_attempts: number;
|
||||
timeout: number;
|
||||
}
|
||||
|
||||
const puid = new Puid();
|
||||
|
||||
export class Report {
|
||||
public readonly jobtype: string;
|
||||
public readonly created_by: string | boolean;
|
||||
public readonly payload: unknown;
|
||||
public readonly browser_type: string;
|
||||
public readonly id: string;
|
||||
|
||||
public readonly priority: number;
|
||||
// queue stuff, to be removed with Task Manager integration
|
||||
public readonly max_attempts: number;
|
||||
public readonly timeout: number;
|
||||
|
||||
public _index: string;
|
||||
public _id?: string; // set by ES
|
||||
public _primary_term?: unknown; // set by ES
|
||||
public _seq_no: unknown; // set by ES
|
||||
|
||||
/*
|
||||
* Create an unsaved report
|
||||
*/
|
||||
constructor(opts: Payload) {
|
||||
this.jobtype = opts.jobtype;
|
||||
this.created_by = opts.created_by;
|
||||
this.payload = opts.payload;
|
||||
this.browser_type = opts.browser_type;
|
||||
this.priority = opts.priority;
|
||||
this.max_attempts = opts.max_attempts;
|
||||
this.timeout = opts.timeout;
|
||||
this.id = puid.generate();
|
||||
|
||||
this._index = opts.index;
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the report with "live" storage metadata
|
||||
*/
|
||||
updateWithDoc(doc: Partial<Report>) {
|
||||
if (doc._index) {
|
||||
this._index = doc._index; // can not be undefined
|
||||
}
|
||||
|
||||
this._id = doc._id;
|
||||
this._primary_term = doc._primary_term;
|
||||
this._seq_no = doc._seq_no;
|
||||
}
|
||||
|
||||
toJSON() {
|
||||
return {
|
||||
id: this.id,
|
||||
index: this._index,
|
||||
_seq_no: this._seq_no,
|
||||
_primary_term: this._primary_term,
|
||||
jobtype: this.jobtype,
|
||||
created_by: this.created_by,
|
||||
payload: this.payload,
|
||||
timeout: this.timeout,
|
||||
max_attempts: this.max_attempts,
|
||||
priority: this.priority,
|
||||
browser_type: this.browser_type,
|
||||
};
|
||||
}
|
||||
}
|
166
x-pack/plugins/reporting/server/lib/store/store.test.ts
Normal file
166
x-pack/plugins/reporting/server/lib/store/store.test.ts
Normal file
|
@ -0,0 +1,166 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import sinon from 'sinon';
|
||||
import { ReportingConfig, ReportingCore } from '../..';
|
||||
import { createMockReportingCore } from '../../test_helpers';
|
||||
import { createMockLevelLogger } from '../../test_helpers/create_mock_levellogger';
|
||||
import { ReportingStore } from './store';
|
||||
import { ElasticsearchServiceSetup } from 'src/core/server';
|
||||
|
||||
const getMockConfig = (mockConfigGet: sinon.SinonStub) => ({
|
||||
get: mockConfigGet,
|
||||
kbnConfig: { get: mockConfigGet },
|
||||
});
|
||||
|
||||
describe('ReportingStore', () => {
|
||||
const mockLogger = createMockLevelLogger();
|
||||
let mockConfig: ReportingConfig;
|
||||
let mockCore: ReportingCore;
|
||||
|
||||
const callClusterStub = sinon.stub();
|
||||
const mockElasticsearch = { legacy: { client: { callAsInternalUser: callClusterStub } } };
|
||||
|
||||
beforeEach(async () => {
|
||||
const mockConfigGet = sinon.stub();
|
||||
mockConfigGet.withArgs('index').returns('.reporting-test');
|
||||
mockConfigGet.withArgs('queue', 'indexInterval').returns('week');
|
||||
mockConfig = getMockConfig(mockConfigGet);
|
||||
mockCore = await createMockReportingCore(mockConfig);
|
||||
|
||||
callClusterStub.withArgs('indices.exists').resolves({});
|
||||
callClusterStub.withArgs('indices.create').resolves({});
|
||||
callClusterStub.withArgs('index').resolves({});
|
||||
callClusterStub.withArgs('indices.refresh').resolves({});
|
||||
callClusterStub.withArgs('update').resolves({});
|
||||
|
||||
mockCore.getElasticsearchService = () =>
|
||||
(mockElasticsearch as unknown) as ElasticsearchServiceSetup;
|
||||
});
|
||||
|
||||
describe('addReport', () => {
|
||||
it('returns Report object', async () => {
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const reportType = 'unknowntype';
|
||||
const reportPayload = {};
|
||||
const reportOptions = {
|
||||
timeout: 10000,
|
||||
created_by: 'created_by_string',
|
||||
browser_type: 'browser_type_string',
|
||||
max_attempts: 1,
|
||||
};
|
||||
await expect(
|
||||
store.addReport(reportType, reportPayload, reportOptions)
|
||||
).resolves.toMatchObject({
|
||||
_primary_term: undefined,
|
||||
_seq_no: undefined,
|
||||
browser_type: 'browser_type_string',
|
||||
created_by: 'created_by_string',
|
||||
jobtype: 'unknowntype',
|
||||
max_attempts: 1,
|
||||
payload: {},
|
||||
priority: 10,
|
||||
timeout: 10000,
|
||||
});
|
||||
});
|
||||
|
||||
it('throws if options has invalid indexInterval', async () => {
|
||||
const mockConfigGet = sinon.stub();
|
||||
mockConfigGet.withArgs('index').returns('.reporting-test');
|
||||
mockConfigGet.withArgs('queue', 'indexInterval').returns('centurially');
|
||||
mockConfig = getMockConfig(mockConfigGet);
|
||||
mockCore = await createMockReportingCore(mockConfig);
|
||||
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const reportType = 'unknowntype';
|
||||
const reportPayload = {};
|
||||
const reportOptions = {
|
||||
timeout: 10000,
|
||||
created_by: 'created_by_string',
|
||||
browser_type: 'browser_type_string',
|
||||
max_attempts: 1,
|
||||
};
|
||||
expect(
|
||||
store.addReport(reportType, reportPayload, reportOptions)
|
||||
).rejects.toMatchInlineSnapshot(`[Error: Invalid index interval: centurially]`);
|
||||
});
|
||||
|
||||
it('handles error creating the index', async () => {
|
||||
// setup
|
||||
callClusterStub.withArgs('indices.exists').resolves(false);
|
||||
callClusterStub.withArgs('indices.create').rejects(new Error('error'));
|
||||
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const reportType = 'unknowntype';
|
||||
const reportPayload = {};
|
||||
const reportOptions = {
|
||||
timeout: 10000,
|
||||
created_by: 'created_by_string',
|
||||
browser_type: 'browser_type_string',
|
||||
max_attempts: 1,
|
||||
};
|
||||
await expect(
|
||||
store.addReport(reportType, reportPayload, reportOptions)
|
||||
).rejects.toMatchInlineSnapshot(`[Error: error]`);
|
||||
});
|
||||
|
||||
/* Creating the index will fail, if there were multiple jobs staged in
|
||||
* parallel and creation completed from another Kibana instance. Only the
|
||||
* first request in line can successfully create it.
|
||||
* In spite of that race condition, adding the new job in Elasticsearch is
|
||||
* fine.
|
||||
*/
|
||||
it('ignores index creation error if the index already exists and continues adding the report', async () => {
|
||||
// setup
|
||||
callClusterStub.withArgs('indices.exists').resolves(false);
|
||||
callClusterStub.withArgs('indices.create').rejects(new Error('error'));
|
||||
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const reportType = 'unknowntype';
|
||||
const reportPayload = {};
|
||||
const reportOptions = {
|
||||
timeout: 10000,
|
||||
created_by: 'created_by_string',
|
||||
browser_type: 'browser_type_string',
|
||||
max_attempts: 1,
|
||||
};
|
||||
await expect(
|
||||
store.addReport(reportType, reportPayload, reportOptions)
|
||||
).rejects.toMatchInlineSnapshot(`[Error: error]`);
|
||||
});
|
||||
|
||||
it('skips creating the index if already exists', async () => {
|
||||
// setup
|
||||
callClusterStub.withArgs('indices.exists').resolves(false);
|
||||
callClusterStub
|
||||
.withArgs('indices.create')
|
||||
.rejects(new Error('resource_already_exists_exception')); // will be triggered but ignored
|
||||
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const reportType = 'unknowntype';
|
||||
const reportPayload = {};
|
||||
const reportOptions = {
|
||||
timeout: 10000,
|
||||
created_by: 'created_by_string',
|
||||
browser_type: 'browser_type_string',
|
||||
max_attempts: 1,
|
||||
};
|
||||
await expect(
|
||||
store.addReport(reportType, reportPayload, reportOptions)
|
||||
).resolves.toMatchObject({
|
||||
_primary_term: undefined,
|
||||
_seq_no: undefined,
|
||||
browser_type: 'browser_type_string',
|
||||
created_by: 'created_by_string',
|
||||
jobtype: 'unknowntype',
|
||||
max_attempts: 1,
|
||||
payload: {},
|
||||
priority: 10,
|
||||
timeout: 10000,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
169
x-pack/plugins/reporting/server/lib/store/store.ts
Normal file
169
x-pack/plugins/reporting/server/lib/store/store.ts
Normal file
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { ElasticsearchServiceSetup } from 'src/core/server';
|
||||
import { LevelLogger } from '../';
|
||||
import { ReportingCore } from '../../';
|
||||
import { LayoutInstance } from '../../export_types/common/layouts';
|
||||
import { indexTimestamp } from './index_timestamp';
|
||||
import { mapping } from './mapping';
|
||||
import { Report } from './report';
|
||||
|
||||
export const statuses = {
|
||||
JOB_STATUS_PENDING: 'pending',
|
||||
JOB_STATUS_PROCESSING: 'processing',
|
||||
JOB_STATUS_COMPLETED: 'completed',
|
||||
JOB_STATUS_WARNINGS: 'completed_with_warnings',
|
||||
JOB_STATUS_FAILED: 'failed',
|
||||
JOB_STATUS_CANCELLED: 'cancelled',
|
||||
};
|
||||
|
||||
interface AddReportOpts {
|
||||
timeout: number;
|
||||
created_by: string | boolean;
|
||||
browser_type: string;
|
||||
max_attempts: number;
|
||||
}
|
||||
|
||||
interface UpdateQuery {
|
||||
index: string;
|
||||
id: string;
|
||||
if_seq_no: unknown;
|
||||
if_primary_term: unknown;
|
||||
body: { doc: Partial<Report> };
|
||||
}
|
||||
|
||||
/*
|
||||
* A class to give an interface to historical reports in the reporting.index
|
||||
* - track the state: pending, processing, completed, etc
|
||||
* - handle updates and deletes to the reporting document
|
||||
* - interface for downloading the report
|
||||
*/
|
||||
export class ReportingStore {
|
||||
public readonly indexPrefix: string;
|
||||
public readonly indexInterval: string;
|
||||
|
||||
private client: ElasticsearchServiceSetup['legacy']['client'];
|
||||
private logger: LevelLogger;
|
||||
|
||||
constructor(reporting: ReportingCore, logger: LevelLogger) {
|
||||
const config = reporting.getConfig();
|
||||
const elasticsearch = reporting.getElasticsearchService();
|
||||
|
||||
this.client = elasticsearch.legacy.client;
|
||||
this.indexPrefix = config.get('index');
|
||||
this.indexInterval = config.get('queue', 'indexInterval');
|
||||
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
private async createIndex(indexName: string) {
|
||||
return this.client
|
||||
.callAsInternalUser('indices.exists', {
|
||||
index: indexName,
|
||||
})
|
||||
.then((exists) => {
|
||||
if (exists) {
|
||||
return exists;
|
||||
}
|
||||
|
||||
const indexSettings = {
|
||||
number_of_shards: 1,
|
||||
auto_expand_replicas: '0-1',
|
||||
};
|
||||
const body = {
|
||||
settings: indexSettings,
|
||||
mappings: {
|
||||
properties: mapping,
|
||||
},
|
||||
};
|
||||
|
||||
return this.client
|
||||
.callAsInternalUser('indices.create', {
|
||||
index: indexName,
|
||||
body,
|
||||
})
|
||||
.then(() => true)
|
||||
.catch((err: Error) => {
|
||||
const isIndexExistsError = err.message.match(/resource_already_exists_exception/);
|
||||
if (isIndexExistsError) {
|
||||
// Do not fail a job if the job runner hits the race condition.
|
||||
this.logger.warn(`Automatic index creation failed: index already exists: ${err}`);
|
||||
return;
|
||||
}
|
||||
|
||||
throw err;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private async saveReport(report: Report) {
|
||||
const payload = report.payload as { objectType: string; layout: LayoutInstance };
|
||||
|
||||
const indexParams = {
|
||||
index: report._index,
|
||||
id: report.id,
|
||||
body: {
|
||||
jobtype: report.jobtype,
|
||||
meta: {
|
||||
// We are copying these values out of payload because these fields are indexed and can be aggregated on
|
||||
// for tracking stats, while payload contents are not.
|
||||
objectType: payload.objectType,
|
||||
layout: payload.layout ? payload.layout.id : 'none',
|
||||
},
|
||||
payload: report.payload,
|
||||
created_by: report.created_by,
|
||||
timeout: report.timeout,
|
||||
process_expiration: new Date(0), // use epoch so the job query works
|
||||
created_at: new Date(),
|
||||
attempts: 0,
|
||||
max_attempts: report.max_attempts,
|
||||
status: statuses.JOB_STATUS_PENDING,
|
||||
browser_type: report.browser_type,
|
||||
},
|
||||
};
|
||||
return this.client.callAsInternalUser('index', indexParams);
|
||||
}
|
||||
|
||||
private async refreshIndex(index: string) {
|
||||
return this.client.callAsInternalUser('indices.refresh', { index });
|
||||
}
|
||||
|
||||
public async addReport(type: string, payload: unknown, options: AddReportOpts): Promise<Report> {
|
||||
const timestamp = indexTimestamp(this.indexInterval);
|
||||
const index = `${this.indexPrefix}-${timestamp}`;
|
||||
await this.createIndex(index);
|
||||
|
||||
const report = new Report({
|
||||
index,
|
||||
payload,
|
||||
jobtype: type,
|
||||
created_by: options.created_by,
|
||||
browser_type: options.browser_type,
|
||||
max_attempts: options.max_attempts,
|
||||
timeout: options.timeout,
|
||||
priority: 10, // unused
|
||||
});
|
||||
|
||||
const doc = await this.saveReport(report);
|
||||
report.updateWithDoc(doc);
|
||||
|
||||
await this.refreshIndex(index);
|
||||
this.logger.info(`Successfully queued pending job: ${report._index}/${report.id}`);
|
||||
|
||||
return report;
|
||||
}
|
||||
|
||||
public async updateReport(query: UpdateQuery): Promise<Report> {
|
||||
return this.client.callAsInternalUser('update', {
|
||||
index: query.index,
|
||||
id: query.id,
|
||||
if_seq_no: query.if_seq_no,
|
||||
if_primary_term: query.if_primary_term,
|
||||
body: { doc: query.body.doc },
|
||||
});
|
||||
}
|
||||
}
|
|
@ -8,7 +8,13 @@ import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from 'src/core
|
|||
import { ReportingCore } from './';
|
||||
import { initializeBrowserDriverFactory } from './browsers';
|
||||
import { buildConfig, ReportingConfigType } from './config';
|
||||
import { createQueueFactory, enqueueJobFactory, LevelLogger, runValidations } from './lib';
|
||||
import {
|
||||
createQueueFactory,
|
||||
enqueueJobFactory,
|
||||
LevelLogger,
|
||||
runValidations,
|
||||
ReportingStore,
|
||||
} from './lib';
|
||||
import { registerRoutes } from './routes';
|
||||
import { setFieldFormats } from './services';
|
||||
import { ReportingSetup, ReportingSetupDeps, ReportingStart, ReportingStartDeps } from './types';
|
||||
|
@ -86,9 +92,9 @@ export class ReportingPlugin
|
|||
const config = reportingCore.getConfig();
|
||||
|
||||
const browserDriverFactory = await initializeBrowserDriverFactory(config, logger);
|
||||
|
||||
const esqueue = await createQueueFactory(reportingCore, logger); // starts polling for pending jobs
|
||||
const enqueueJob = enqueueJobFactory(reportingCore, logger); // called from generation routes
|
||||
const store = new ReportingStore(reportingCore, logger);
|
||||
const esqueue = await createQueueFactory(reportingCore, store, logger); // starts polling for pending jobs
|
||||
const enqueueJob = enqueueJobFactory(reportingCore, store, logger); // called from generation routes
|
||||
|
||||
reportingCore.pluginStart({
|
||||
browserDriverFactory,
|
||||
|
@ -96,6 +102,7 @@ export class ReportingPlugin
|
|||
uiSettings: core.uiSettings,
|
||||
esqueue,
|
||||
enqueueJob,
|
||||
store,
|
||||
});
|
||||
|
||||
// run self-check validations
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
import { LevelLogger } from '../lib';
|
||||
|
||||
export function createMockLevelLogger() {
|
||||
// eslint-disable-next-line no-console
|
||||
const consoleLogger = (tag: string) => (message: unknown) => console.log(tag, message);
|
||||
const innerLogger = {
|
||||
get: () => innerLogger,
|
||||
debug: consoleLogger('debug'),
|
||||
info: consoleLogger('info'),
|
||||
warn: consoleLogger('warn'),
|
||||
trace: consoleLogger('trace'),
|
||||
error: consoleLogger('error'),
|
||||
fatal: consoleLogger('fatal'),
|
||||
log: consoleLogger('log'),
|
||||
};
|
||||
return new LevelLogger(innerLogger);
|
||||
}
|
|
@ -20,6 +20,8 @@ import {
|
|||
} from '../browsers';
|
||||
import { ReportingInternalSetup, ReportingInternalStart } from '../core';
|
||||
import { ReportingStartDeps } from '../types';
|
||||
import { ReportingStore } from '../lib';
|
||||
import { createMockLevelLogger } from './create_mock_levellogger';
|
||||
|
||||
(initializeBrowserDriverFactory as jest.Mock<
|
||||
Promise<HeadlessChromiumDriverFactory>
|
||||
|
@ -37,13 +39,19 @@ const createMockPluginSetup = (setupMock?: any): ReportingInternalSetup => {
|
|||
};
|
||||
};
|
||||
|
||||
const createMockPluginStart = (startMock?: any): ReportingInternalStart => {
|
||||
const createMockPluginStart = (
|
||||
mockReportingCore: ReportingCore,
|
||||
startMock?: any
|
||||
): ReportingInternalStart => {
|
||||
const logger = createMockLevelLogger();
|
||||
const store = new ReportingStore(mockReportingCore, logger);
|
||||
return {
|
||||
browserDriverFactory: startMock.browserDriverFactory,
|
||||
enqueueJob: startMock.enqueueJob,
|
||||
esqueue: startMock.esqueue,
|
||||
savedObjects: startMock.savedObjects || { getScopedClient: jest.fn() },
|
||||
uiSettings: startMock.uiSettings || { asScopedToClient: () => ({ get: jest.fn() }) },
|
||||
store,
|
||||
};
|
||||
};
|
||||
|
||||
|
@ -60,9 +68,22 @@ export const createMockStartDeps = (startMock?: any): ReportingStartDeps => ({
|
|||
|
||||
export const createMockReportingCore = async (
|
||||
config: ReportingConfig,
|
||||
setupDepsMock: ReportingInternalSetup | undefined = createMockPluginSetup({}),
|
||||
startDepsMock: ReportingInternalStart | undefined = createMockPluginStart({})
|
||||
setupDepsMock: ReportingInternalSetup | undefined = undefined,
|
||||
startDepsMock: ReportingInternalStart | undefined = undefined
|
||||
) => {
|
||||
if (!setupDepsMock) {
|
||||
setupDepsMock = createMockPluginSetup({});
|
||||
}
|
||||
|
||||
const mockReportingCore = {
|
||||
getConfig: () => config,
|
||||
getElasticsearchService: () => setupDepsMock?.elasticsearch,
|
||||
} as ReportingCore;
|
||||
|
||||
if (!startDepsMock) {
|
||||
startDepsMock = createMockPluginStart(mockReportingCore, {});
|
||||
}
|
||||
|
||||
config = config || {};
|
||||
const core = new ReportingCore();
|
||||
|
||||
|
|
|
@ -7,8 +7,7 @@
|
|||
import expect from '@kbn/expect';
|
||||
import * as Rx from 'rxjs';
|
||||
import { filter, first, mapTo, switchMap, timeout } from 'rxjs/operators';
|
||||
// @ts-ignore no module definition
|
||||
import { indexTimestamp } from '../../plugins/reporting/server/lib/esqueue/helpers/index_timestamp';
|
||||
import { indexTimestamp } from '../../plugins/reporting/server/lib/store/index_timestamp';
|
||||
import { services as xpackServices } from '../functional/services';
|
||||
import { FtrProviderContext } from './ftr_provider_context';
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue