Remove types from Esqueue (#32146) (#32229)

This commit is contained in:
Josh Dover 2019-02-28 14:07:57 -06:00 committed by GitHub
parent a8dde3eeb0
commit 1574dee431
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 16 additions and 106 deletions

View file

@ -4,8 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/
export const QUEUE_DOCTYPE = 'esqueue';
export const JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY =
'xpack.reporting.jobCompletionNotifications';

View file

@ -6,7 +6,6 @@
import { Esqueue } from './esqueue';
import { createWorkersFactory } from './create_workers';
import { QUEUE_DOCTYPE } from '../../common/constants';
import { oncePerServer } from './once_per_server';
import { createTaggedLogger } from './create_tagged_logger';
@ -19,7 +18,6 @@ function createQueueFn(server) {
const logger = createTaggedLogger(server, ['reporting', 'esqueue']);
const queueOptions = {
doctype: QUEUE_DOCTYPE,
interval: queueConfig.indexInterval,
timeout: queueConfig.timeout,
dateSeparator: dateSeparator,

View file

@ -16,7 +16,6 @@ export function ClientMock() {
const shardCount = 2;
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'),
_seq_no: 1,
_primary_term: 1,
@ -48,7 +47,6 @@ export function ClientMock() {
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
_seq_no: params._seq_no || 1,
_primary_term: params._primary_term || 1,
@ -62,7 +60,6 @@ export function ClientMock() {
const hits = times(count, () => {
return {
_index: params.index || 'index',
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: uniqueId('documentId'),
_seq_no: random(1, 5),
_primar_term: random(1, 5),
@ -94,7 +91,6 @@ export function ClientMock() {
const shardCount = 2;
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'),
_seq_no: params.if_seq_no + 1 || 2,
_primary_term: params.if_primary_term + 1 || 2,

View file

@ -29,9 +29,8 @@ describe('Create Index', function () {
.then((exists) => expect(exists).to.be(true));
});
it('should create the index with type mappings and default settings', function () {
it('should create the index with mappings and default settings', function () {
const indexName = 'test-index';
const docType = constants.DEFAULT_SETTING_DOCTYPE;
const settings = constants.DEFAULT_SETTING_INDEX_SETTINGS;
const result = createIndex(client, indexName);
@ -44,34 +43,12 @@ describe('Create Index', function () {
expect(payload.body).to.have.property('settings');
expect(payload.body.settings).to.eql(settings);
expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings[docType]).to.have.property('properties');
});
});
it('should accept a custom doctype', function () {
const indexName = 'test-index';
const docType = 'my_type';
const settings = constants.DEFAULT_SETTING_INDEX_SETTINGS;
const result = createIndex(client, indexName, docType);
return result
.then(function () {
const payload = createSpy.getCall(0).args[1];
sinon.assert.callCount(createSpy, 1);
expect(payload).to.have.property('index', indexName);
expect(payload).to.have.property('body');
expect(payload.body).to.have.property('settings');
expect(payload.body.settings).to.eql(settings);
expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings[docType]).to.have.property('properties');
expect(payload.body.mappings).to.have.property('properties');
});
});
it('should create the index with custom settings', function () {
const indexName = 'test-index';
const docType = constants.DEFAULT_SETTING_DOCTYPE;
const settings = {
...constants.DEFAULT_SETTING_INDEX_SETTINGS,
auto_expand_replicas: false,
@ -79,7 +56,7 @@ describe('Create Index', function () {
number_of_replicas: 1,
format: '3000',
};
const result = createIndex(client, indexName, docType, settings);
const result = createIndex(client, indexName, settings);
return result
.then(function () {
@ -90,8 +67,7 @@ describe('Create Index', function () {
expect(payload.body).to.have.property('settings');
expect(payload.body.settings).to.eql(settings);
expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings[docType]).to.have.property('properties');
expect(payload.body.mappings).to.have.property('properties');
});
});
});

View file

@ -77,7 +77,6 @@ describe('Esqueue class', function () {
const job = queue.addJob(jobType, payload);
const options = job.getProp('options');
expect(options).to.have.property('timeout', constants.DEFAULT_SETTING_TIMEOUT);
expect(options).to.have.property('doctype', constants.DEFAULT_SETTING_DOCTYPE);
});
it('should pass queue index settings', function () {
@ -133,14 +132,12 @@ describe('Esqueue class', function () {
it('should pass worker options', function () {
const workerOptions = {
size: 12,
doctype: 'tests'
};
queue = new Esqueue('esqueue', { client });
const worker = queue.registerWorker('type', noop, workerOptions);
const options = worker.getProp('options');
expect(options.size).to.equal(workerOptions.size);
expect(options.doctype).to.equal(workerOptions.doctype);
});
});

View file

@ -84,7 +84,6 @@ describe('Job Class', function () {
const args = createIndexMock.getCall(0).args;
expect(args[0]).to.equal(client);
expect(args[1]).to.equal(index);
expect(args[2]).to.equal(constants.DEFAULT_SETTING_DOCTYPE);
});
});
@ -93,7 +92,6 @@ describe('Job Class', function () {
return job.ready.then(() => {
const indexArgs = validateDoc(indexSpy);
expect(indexArgs).to.have.property('index', index);
expect(indexArgs).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
expect(indexArgs).to.have.property('body');
expect(indexArgs.body).to.have.property('payload', payload);
});
@ -104,7 +102,6 @@ describe('Job Class', function () {
return job.ready.then(() => {
const indexArgs = validateDoc(indexSpy);
expect(indexArgs).to.have.property('index', index);
expect(indexArgs).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
expect(indexArgs).to.have.property('body');
expect(indexArgs.body).to.have.property('jobtype', type);
});
@ -135,7 +132,6 @@ describe('Job Class', function () {
try {
expect(jobDoc).to.have.property('id');
expect(jobDoc).to.have.property('index');
expect(jobDoc).to.have.property('type');
expect(jobDoc).to.have.property('_seq_no');
expect(jobDoc).to.have.property('_primary_term');
done();
@ -348,7 +344,6 @@ describe('Job Class', function () {
.then((doc) => {
const jobDoc = job.document; // document should be resolved
expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', jobDoc.type);
expect(doc).to.have.property('id', jobDoc.id);
expect(doc).to.have.property('_seq_no', jobDoc._seq_no);
expect(doc).to.have.property('_primary_term', jobDoc._primary_term);
@ -401,7 +396,6 @@ describe('Job Class', function () {
const doc = job.toJSON();
expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
expect(doc).to.have.property('jobtype', type);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('timeout', options.timeout);

View file

@ -105,7 +105,6 @@ describe('Worker class', function () {
expect(worker).to.have.property('jobtype', jobtype);
expect(worker).to.have.property('workerFn', workerFn);
expect(worker).to.have.property('checkSize');
expect(worker).to.have.property('doctype');
});
it('should have a unique ID', function () {
@ -320,25 +319,6 @@ describe('Worker class', function () {
});
});
describe('query parameters', function () {
beforeEach(() => {
searchStub = sinon.stub(mockQueue.client, 'callWithInternalUser')
.callsFake(() => Promise.resolve({ hits: { hits: [] } }));
});
it('should query by default doctype', function () {
const params = getSearchParams();
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
});
it('should query by custom doctype', function () {
const doctype = 'custom_test';
const params = getSearchParams('type', { doctype });
expect(params).to.have.property('type', doctype);
});
});
describe('query body', function () {
const conditionPath = 'query.bool.filter.bool';
const jobtype = 'test_jobtype';
@ -442,7 +422,6 @@ describe('Worker class', function () {
worker._claimJob(job);
const query = updateSpy.firstCall.args[1];
expect(query).to.have.property('index', job._index);
expect(query).to.have.property('type', job._type);
expect(query).to.have.property('id', job._id);
expect(query).to.have.property('if_seq_no', job._seq_no);
expect(query).to.have.property('if_primary_term', job._primary_term);
@ -519,7 +498,6 @@ describe('Worker class', function () {
describe('find a pending job to claim', function () {
const getMockJobs = (status = 'pending') => ([{
_index: 'myIndex',
_type: 'test',
_id: 12345,
_seq_no: 3,
_primary_term: 3,
@ -583,7 +561,6 @@ describe('Worker class', function () {
return worker._claimPendingJobs(getMockJobs())
.then(claimedJob => {
expect(claimedJob._index).to.be('myIndex');
expect(claimedJob._type).to.be('test');
expect(claimedJob._source.jobtype).to.be('jobtype');
expect(claimedJob._source.status).to.be('processing');
expect(claimedJob.test).to.be('cool');
@ -616,7 +593,6 @@ describe('Worker class', function () {
worker._failJob(job);
const query = updateSpy.firstCall.args[1];
expect(query).to.have.property('index', job._index);
expect(query).to.have.property('type', job._type);
expect(query).to.have.property('id', job._id);
expect(query).to.have.property('if_seq_no', job._seq_no);
expect(query).to.have.property('if_primary_term', job._primary_term);
@ -744,7 +720,6 @@ describe('Worker class', function () {
sinon.assert.calledOnce(updateSpy);
const query = updateSpy.firstCall.args[1];
expect(query).to.have.property('index', job._index);
expect(query).to.have.property('type', job._type);
expect(query).to.have.property('id', job._id);
expect(query).to.have.property('if_seq_no', job._seq_no);
expect(query).to.have.property('if_primary_term', job._primary_term);
@ -785,7 +760,6 @@ describe('Worker class', function () {
expect(workerJob).to.have.property('job');
expect(workerJob.job).to.have.property('id');
expect(workerJob.job).to.have.property('index');
expect(workerJob.job).to.have.property('type');
expect(workerJob).to.have.property('output');
expect(workerJob.output).to.have.property('content');

View file

@ -8,7 +8,6 @@ export const defaultSettings = {
DEFAULT_SETTING_TIMEOUT: 10000,
DEFAULT_SETTING_DATE_SEPARATOR: '-',
DEFAULT_SETTING_INTERVAL: 'week',
DEFAULT_SETTING_DOCTYPE: 'esqueue',
DEFAULT_SETTING_INDEX_SETTINGS: {
number_of_shards: 1,
auto_expand_replicas: '0-1',

View file

@ -66,18 +66,14 @@ const schema = {
}
};
export function createIndex(client, indexName,
doctype = constants.DEFAULT_SETTING_DOCTYPE,
indexSettings = { }) {
export function createIndex(client, indexName, indexSettings = {}) {
const body = {
settings: {
...constants.DEFAULT_SETTING_INDEX_SETTINGS,
...indexSettings
},
mappings: {
[doctype]: {
properties: schema
}
properties: schema
}
};
@ -88,7 +84,6 @@ export function createIndex(client, indexName,
if (!exists) {
return client.callWithInternalUser('indices.create', {
index: indexName,
include_type_name: true,
body: body
})
.then(() => true)

View file

@ -22,7 +22,6 @@ export class Esqueue extends EventEmitter {
this.settings = {
interval: constants.DEFAULT_SETTING_INTERVAL,
timeout: constants.DEFAULT_SETTING_TIMEOUT,
doctype: constants.DEFAULT_SETTING_DOCTYPE,
dateSeparator: constants.DEFAULT_SETTING_DATE_SEPARATOR,
...omit(options, [ 'client' ])
};
@ -43,7 +42,7 @@ export class Esqueue extends EventEmitter {
});
}
addJob(type, payload, opts = {}) {
addJob(jobtype, payload, opts = {}) {
const timestamp = indexTimestamp(this.settings.interval, this.settings.dateSeparator);
const index = `${this.index}-${timestamp}`;
const defaults = {
@ -51,12 +50,11 @@ export class Esqueue extends EventEmitter {
};
const options = Object.assign(defaults, opts, {
doctype: this.settings.doctype,
indexSettings: this.settings.indexSettings,
logger: this._logger
});
return new Job(this, index, type, payload, options);
return new Job(this, index, jobtype, payload, options);
}
registerWorker(type, workerFn, opts) {

View file

@ -13,8 +13,8 @@ import { isPlainObject } from 'lodash';
const puid = new Puid();
export class Job extends events.EventEmitter {
constructor(queue, index, type, payload, options = {}) {
if (typeof type !== 'string') throw new Error('Type must be a string');
constructor(queue, index, jobtype, payload, options = {}) {
if (typeof jobtype !== 'string') throw new Error('Jobtype must be a string');
if (!isPlainObject(payload)) throw new Error('Payload must be a plain object');
super();
@ -23,13 +23,12 @@ export class Job extends events.EventEmitter {
this._client = this.queue.client;
this.id = puid.generate();
this.index = index;
this.jobtype = type;
this.jobtype = jobtype;
this.payload = payload;
this.created_by = options.created_by || false;
this.timeout = options.timeout || 10000;
this.maxAttempts = options.max_attempts || 3;
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
this.doctype = options.doctype || constants.DEFAULT_SETTING_DOCTYPE;
this.indexSettings = options.indexSettings || {};
this.browser_type = options.browser_type;
@ -48,7 +47,6 @@ export class Job extends events.EventEmitter {
const indexParams = {
index: this.index,
type: this.doctype,
id: this.id,
body: {
jobtype: this.jobtype,
@ -75,12 +73,11 @@ export class Job extends events.EventEmitter {
indexParams.headers = options.headers;
}
this.ready = createIndex(this._client, this.index, this.doctype, this.indexSettings)
this.ready = createIndex(this._client, this.index, this.indexSettings)
.then(() => this._client.callWithInternalUser('index', indexParams))
.then((doc) => {
this.document = {
id: doc._id,
type: doc._type,
index: doc._index,
_seq_no: doc._seq_no,
_primary_term: doc._primary_term,
@ -110,7 +107,6 @@ export class Job extends events.EventEmitter {
.then(() => {
return this._client.callWithInternalUser('get', {
index: this.index,
type: this.doctype,
id: this.id
});
})
@ -118,7 +114,6 @@ export class Job extends events.EventEmitter {
return Object.assign(doc._source, {
index: doc._index,
id: doc._id,
type: doc._type,
_seq_no: doc._seq_no,
_primary_term: doc._primary_term,
});
@ -129,7 +124,6 @@ export class Job extends events.EventEmitter {
return {
id: this.id,
index: this.index,
type: this.doctype,
jobtype: this.jobtype,
created_by: this.created_by,
payload: this.payload,

View file

@ -17,7 +17,6 @@ const puid = new Puid();
function formatJobObject(job) {
return {
index: job._index,
type: job._type,
id: job._id,
};
}
@ -56,7 +55,6 @@ export class Worker extends events.EventEmitter {
this.jobtype = type;
this.workerFn = workerFn;
this.checkSize = opts.size || 10;
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;
this.debug = getLogger(opts, this.id, 'debug');
this.warn = getLogger(opts, this.id, 'warn');
@ -86,7 +84,6 @@ export class Worker extends events.EventEmitter {
id: this.id,
index: this.queue.index,
jobType: this.jobType,
doctype: this.doctype,
};
}
@ -128,7 +125,6 @@ export class Worker extends events.EventEmitter {
return this._client.callWithInternalUser('update', {
index: job._index,
type: job._type,
id: job._id,
if_seq_no: job._seq_no,
if_primary_term: job._primary_term,
@ -166,7 +162,6 @@ export class Worker extends events.EventEmitter {
return this._client.callWithInternalUser('update', {
index: job._index,
type: job._type,
id: job._id,
if_seq_no: job._seq_no,
if_primary_term: job._primary_term,
@ -244,7 +239,6 @@ export class Worker extends events.EventEmitter {
return this._client.callWithInternalUser('update', {
index: job._index,
type: job._type,
id: job._id,
if_seq_no: job._seq_no,
if_primary_term: job._primary_term,
@ -388,7 +382,6 @@ export class Worker extends events.EventEmitter {
return this._client.callWithInternalUser('search', {
index: `${this.queue.index}-*`,
type: this.doctype,
body: query
})
.then((results) => {

View file

@ -5,7 +5,6 @@
*/
import { get } from 'lodash';
import { QUEUE_DOCTYPE } from '../../common/constants';
import { oncePerServer } from './once_per_server';
const defaultSize = 10;
@ -18,7 +17,7 @@ function jobsQueryFn(server) {
return get(user, 'username', false);
}
function execQuery(type, body) {
function execQuery(queryType, body) {
const defaultBody = {
search: {
_source: {
@ -33,11 +32,10 @@ function jobsQueryFn(server) {
const query = {
index: `${index}-*`,
type: QUEUE_DOCTYPE,
body: Object.assign(defaultBody[type] || {}, body)
body: Object.assign(defaultBody[queryType] || {}, body)
};
return callWithInternalUser(type, query)
return callWithInternalUser(queryType, query)
.catch((err) => {
if (err instanceof esErrors['401']) return;
if (err instanceof esErrors['403']) return;
@ -73,7 +71,7 @@ function jobsQueryFn(server) {
if (jobIds) {
body.query.constant_score.filter.bool.must.push({
ids: { type: QUEUE_DOCTYPE, values: jobIds }
ids: { values: jobIds }
});
}