[6.x] [Task Manager] Ensure putTemplate will only create/update the index template (#28540) (#30163)

* [Task Manager] Ensure putTemplate will only create/update the index template (#28540)

* get template version

* ensure putTemplate only creates/updates the template

* fix tests

* new test for throwing error re old template

* note comment

* clean up test

* test clarification

* store kibana metadata in scheduled task doc

* map `dynamic: false`

* logging

* add kibana uuid

* fix tests

* last todo

* fetching available task uses apiVersion in the query

* scheduledAt

* ts fix

* ts fix

* fix snapshot

* await to fail test if snapshot does not match

* fix bad merge

* remove _.get

* fix typeError happening in tests

* undo merged 7.0 changes
This commit is contained in:
Tim Sullivan 2019-02-05 21:08:41 -07:00 committed by GitHub
parent abd49fa46f
commit c32e4434c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 336 additions and 58 deletions

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import xPackage from '../../package.json';
import { getTemplateVersion } from './lib/get_template_version';
export const TASK_MANAGER_API_VERSION = 1;
export const TASK_MANAGER_TEMPLATE_VERSION = getTemplateVersion(xPackage.version);

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { getTemplateVersion } from './get_template_version';
describe('getTemplateVersion', () => {
it('converts a release build version string into an integer', () => {
const versionStr1 = '7.1.2';
expect(getTemplateVersion(versionStr1)).toBe(7010299);
const versionStr2 = '10.1.0';
expect(getTemplateVersion(versionStr2)).toBe(10010099);
});
it('converts a alpha build version string into an integer', () => {
const versionStr1 = '7.0.0-alpha1';
expect(getTemplateVersion(versionStr1)).toBe(7000001);
const versionStr2 = '7.0.0-alpha3';
expect(getTemplateVersion(versionStr2)).toBe(7000003);
});
it('converts a beta build version string into an integer', () => {
const versionStr1 = '7.0.0-beta4';
expect(getTemplateVersion(versionStr1)).toBe(7000029);
const versionStr5 = '7.0.0-beta8';
expect(getTemplateVersion(versionStr5)).toBe(7000033);
});
it('converts a snapshot build version string into an integer', () => {
expect(getTemplateVersion('8.0.0-alpha1')).toBe(8000001);
expect(getTemplateVersion('8.0.0-alpha1-snapshot')).toBe(8000001);
});
it('not intended to handle any version parts with 3-digits: it will create malformed integer values', () => {
expect(getTemplateVersion('60.61.1') === getTemplateVersion('6.6.101')).toBe(true); // both produce 60610199
expect(getTemplateVersion('1.32.0') < getTemplateVersion('1.3.223423')).toBe(true);
});
});

View file

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { padLeft } from 'lodash';
/*
* The logic for ID is: XXYYZZAA, where XX is major version, YY is minor
* version, ZZ is revision, and AA is alpha/beta/rc indicator.
*
* AA values below 25 are for alpha builder (since 5.0), and above 25 and below
* 50 are beta builds, and below 99 are RC builds, with 99 indicating a release
* the (internal) format of the id is there so we can easily do after/before
* checks on the id
*
* Note: the conversion method is carried over from Elasticsearch:
* https://github.com/elastic/elasticsearch/blob/de962b2/server/src/main/java/org/elasticsearch/Version.java
*/
export function getTemplateVersion(versionStr: string): number {
// break up the string parts
const splitted = versionStr.split('.');
const minorStr = splitted[2] || '';
// pad each part with leading 0 to make 2 characters
const padded = splitted.map((v: string) => {
const vMatches = v.match(/\d+/);
if (vMatches) {
return padLeft(vMatches[0], 2, '0');
}
return '00';
});
const [majorV, minorV, patchV] = padded;
// append the alpha/beta/rc indicator
let buildV;
if (minorStr.match('alpha')) {
const matches = minorStr.match(/alpha(?<alpha>\d+)/);
if (matches != null && matches.groups != null) {
const alphaVerInt = parseInt(matches.groups.alpha, 10); // alpha build indicator
buildV = padLeft(`${alphaVerInt}`, 2, '0');
}
} else if (minorStr.match('beta')) {
const matches = minorStr.match(/beta(?<beta>\d+)/);
if (matches != null && matches.groups != null) {
const alphaVerInt = parseInt(matches.groups.beta, 10) + 25; // beta build indicator
buildV = padLeft(`${alphaVerInt}`, 2, '0');
}
} else {
buildV = '99'; // release build indicator
}
const joinedParts = [majorV, minorV, patchV, buildV].join('');
return parseInt(joinedParts, 10);
}

View file

@ -24,6 +24,7 @@ const getMockConcreteTaskInstance = () => {
attempts: number;
status: TaskStatus;
runAt: Date;
scheduledAt: Date;
state: any;
taskType: string;
params: any;
@ -33,6 +34,7 @@ const getMockConcreteTaskInstance = () => {
attempts: 0,
status: 'idle',
runAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()),
scheduledAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()),
state: {},
taskType: 'nice_task',
params: { abc: 'def' },
@ -53,7 +55,7 @@ const defaultBeforeRun = async (opts: RunContext) => {
};
describe('addMiddlewareToChain', () => {
it('chains the beforeSave functions', () => {
it('chains the beforeSave functions', async () => {
const m1 = {
beforeSave: async (opts: BeforeSaveOpts) => {
Object.assign(opts.taskInstance.params, { m1: true });
@ -80,8 +82,10 @@ describe('addMiddlewareToChain', () => {
middlewareChain = addMiddlewareToChain(m1, m2);
middlewareChain = addMiddlewareToChain(middlewareChain, m3);
middlewareChain.beforeSave({ taskInstance: getMockTaskInstance() }).then((saveOpts: any) => {
expect(saveOpts).toMatchInlineSnapshot(`
await middlewareChain
.beforeSave({ taskInstance: getMockTaskInstance() })
.then((saveOpts: any) => {
expect(saveOpts).toMatchInlineSnapshot(`
Object {
"taskInstance": Object {
"params": Object {
@ -95,10 +99,10 @@ Object {
},
}
`);
});
});
});
it('chains the beforeRun functions', () => {
it('chains the beforeRun functions', async () => {
const m1 = {
beforeSave: defaultBeforeSave,
beforeRun: async (opts: RunContext) => {
@ -131,7 +135,7 @@ Object {
middlewareChain = addMiddlewareToChain(m1, m2);
middlewareChain = addMiddlewareToChain(middlewareChain, m3);
middlewareChain
await middlewareChain
.beforeRun(getMockRunContext(getMockConcreteTaskInstance()))
.then(contextOpts => {
expect(contextOpts).toMatchInlineSnapshot(`
@ -147,6 +151,7 @@ Object {
"abc": "def",
},
"runAt": 2018-09-18T05:33:09.588Z,
"scheduledAt": 2018-09-18T05:33:09.588Z,
"state": Object {},
"status": "idle",
"taskType": "nice_task",

View file

@ -158,6 +158,12 @@ export interface TaskInstance {
*/
taskType: string;
/**
* The date and time that this task was originally scheduled. This is used
* for convenience to task run functions, and for troubleshooting.
*/
scheduledAt?: Date;
/**
* The date and time that this task is scheduled to be run. It is not
* guaranteed to run at this time, but it is guaranteed not to run earlier
@ -210,6 +216,12 @@ export interface ConcreteTaskInstance extends TaskInstance {
*/
version: number;
/**
* The date and time that this task was originally scheduled. This is used
* for convenience to task run functions, and for troubleshooting.
*/
scheduledAt: Date;
/**
* The number of unsuccessful attempts since the last successful run. This
* will be zeroed out after a successful run.

View file

@ -75,6 +75,9 @@ describe('TaskManager', () => {
});
function testOpts() {
const callCluster = sinon.stub();
callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} }));
const $test = {
events: {} as any,
afterPluginsInit: _.noop,
@ -100,7 +103,7 @@ describe('TaskManager', () => {
plugins: {
elasticsearch: {
getCluster() {
return { callWithInternalUser: _.noop };
return { callWithInternalUser: callCluster };
},
status: {
on(eventName: string, callback: () => any) {

View file

@ -53,11 +53,15 @@ export class TaskManager {
const logger = new TaskManagerLogger((...args: any[]) => server.log(...args));
/* Kibana UUID needs to be pulled live (not cached), as it takes a long time
* to initialize, and can change after startup */
const store = new TaskStore({
callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser,
index: config.get('xpack.task_manager.index'),
maxAttempts: config.get('xpack.task_manager.max_attempts'),
supportedTypes: Object.keys(this.definitions),
logger,
getKibanaUuid: () => config.get('server.uuid'),
});
const pool = new TaskPool({
logger,
@ -94,6 +98,7 @@ export class TaskManager {
this.isInitialized = true;
})
.catch((err: Error) => {
// FIXME: check the type of error to make sure it's actually an ES error
logger.warning(err.message);
// rety again to initialize store and poller, using the timing of

View file

@ -14,9 +14,13 @@ let store: TaskStore;
describe('TaskPoller', () => {
beforeEach(() => {
const callCluster = sinon.spy();
const callCluster = sinon.stub();
callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} }));
const getKibanaUuid = sinon.stub().returns('kibana-123-uuid-test');
store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],

View file

@ -232,6 +232,7 @@ describe('TaskManagerRunner', () => {
taskType: 'bar',
version: 32,
runAt: new Date(),
scheduledAt: new Date(),
attempts: 0,
params: {},
scope: ['reporting'],

View file

@ -6,15 +6,25 @@
import _ from 'lodash';
import sinon from 'sinon';
import {
TASK_MANAGER_API_VERSION as API_VERSION,
TASK_MANAGER_TEMPLATE_VERSION as TEMPLATE_VERSION,
} from './constants';
import { TaskInstance, TaskStatus } from './task';
import { FetchOpts, TaskStore } from './task_store';
import { mockLogger } from './test_utils';
const getKibanaUuid = sinon.stub().returns('kibana-uuid-123-test');
describe('TaskStore', () => {
describe('init', () => {
test('creates the task manager index', async () => {
const callCluster = sinon.spy();
const callCluster = sinon.stub();
callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} }));
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
@ -22,7 +32,7 @@ describe('TaskStore', () => {
await store.init();
sinon.assert.calledOnce(callCluster);
sinon.assert.calledTwice(callCluster); // store.init calls twice: once to check for existing template, once to put the template (if needed)
sinon.assert.calledWithMatch(callCluster, 'indices.putTemplate', {
body: {
@ -35,27 +45,64 @@ describe('TaskStore', () => {
name: 'tasky',
});
});
test('logs a warning if newer index template exists', async () => {
const callCluster = sinon.stub();
callCluster
.withArgs('indices.getTemplate')
.returns(Promise.resolve({ tasky: { version: Infinity } }));
const logger = {
info: sinon.spy(),
debug: sinon.spy(),
warning: sinon.spy(),
error: sinon.spy(),
};
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger,
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
});
await store.init();
const loggingCall = logger.warning.getCall(0);
expect(loggingCall.args[0]).toBe(
`This Kibana instance defines an older template version (${TEMPLATE_VERSION}) than is currently in Elasticsearch (Infinity). ` +
`Because of the potential for non-backwards compatible changes, this Kibana instance will only be able to claim scheduled tasks with ` +
`"kibana.apiVersion" <= ${API_VERSION} in the task metadata.`
);
expect(logger.warning.calledOnce).toBe(true);
});
});
describe('schedule', () => {
async function testSchedule(task: TaskInstance) {
const callCluster = sinon.spy(() =>
const callCluster = sinon.stub();
callCluster.withArgs('index').returns(
Promise.resolve({
_id: 'testid',
_version: 3344,
})
);
callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} }));
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['report', 'dernstraight', 'yawn'],
});
await store.init();
const result = await store.schedule(task);
sinon.assert.calledTwice(callCluster);
sinon.assert.calledThrice(callCluster);
return { result, callCluster, arg: callCluster.args[1][1] };
return { result, callCluster, arg: callCluster.args[2][1] };
}
test('serializes the params and state', async () => {
@ -80,7 +127,7 @@ describe('TaskStore', () => {
});
});
test('retiurns a concrete task instance', async () => {
test('returns a concrete task instance', async () => {
const task = {
params: { hello: 'world' },
state: { foo: 'bar' },
@ -119,6 +166,8 @@ describe('TaskStore', () => {
const callCluster = sinon.spy(async () => ({ hits: { hits } }));
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
@ -283,6 +332,7 @@ describe('TaskStore', () => {
const callCluster = sinon.spy(async () => ({ hits: { hits } }));
const store = new TaskStore({
callCluster,
logger: mockLogger(),
supportedTypes: ['a', 'b', 'c'],
index: 'tasky',
maxAttempts: 2,
@ -304,6 +354,8 @@ describe('TaskStore', () => {
const callCluster = sinon.spy(async () => ({ hits: { hits: [] } }));
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
supportedTypes: ['a', 'b', 'c'],
index: 'tasky',
maxAttempts: 2,
@ -339,6 +391,7 @@ describe('TaskStore', () => {
{ terms: { 'task.taskType': ['foo', 'bar'] } },
{ range: { 'task.attempts': { lte: maxAttempts } } },
{ range: { 'task.runAt': { lte: 'now' } } },
{ range: { 'kibana.apiVersion': { lte: 1 } } },
],
},
},
@ -432,6 +485,7 @@ describe('TaskStore', () => {
const runAt = new Date();
const task = {
runAt,
scheduledAt: runAt,
id: 'task:324242',
params: { hello: 'world' },
state: { foo: 'bar' },
@ -444,6 +498,8 @@ describe('TaskStore', () => {
const callCluster = sinon.spy(async () => ({ _version: task.version + 1 }));
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
@ -488,6 +544,8 @@ describe('TaskStore', () => {
);
const store = new TaskStore({
callCluster,
getKibanaUuid,
logger: mockLogger(),
index: 'myindex',
maxAttempts: 2,
supportedTypes: ['a'],

View file

@ -8,15 +8,22 @@
* This module contains helpers for managing the task manager storage layer.
*/
import {
TASK_MANAGER_API_VERSION as API_VERSION,
TASK_MANAGER_TEMPLATE_VERSION as TEMPLATE_VERSION,
} from './constants';
import { Logger } from './lib/logger';
import { ConcreteTaskInstance, ElasticJs, TaskInstance, TaskStatus } from './task';
const DOC_TYPE = '_doc';
export interface StoreOpts {
callCluster: ElasticJs;
getKibanaUuid: () => string;
index: string;
maxAttempts: number;
supportedTypes: string[];
logger: Logger;
}
export interface FetchOpts {
@ -45,8 +52,14 @@ export interface RawTaskDoc {
_version: number;
_source: {
type: string;
kibana: {
uuid: string;
version: number;
apiVersion: number;
};
task: {
taskType: string;
scheduledAt: Date;
runAt: Date;
interval?: string;
attempts: number;
@ -65,10 +78,12 @@ export interface RawTaskDoc {
*/
export class TaskStore {
public readonly maxAttempts: number;
public getKibanaUuid: () => string;
public readonly index: string;
private callCluster: ElasticJs;
private index: string;
private supportedTypes: string[];
private _isInitialized = false; // tslint:disable-line:variable-name
private logger: Logger;
/**
* Constructs a new TaskStore.
@ -77,12 +92,15 @@ export class TaskStore {
* @prop {string} index - The name of the task manager index
* @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned
* @prop {string[]} supportedTypes - The task types supported by this store
* @prop {Logger} logger - The task manager logger.
*/
constructor(opts: StoreOpts) {
this.callCluster = opts.callCluster;
this.index = opts.index;
this.maxAttempts = opts.maxAttempts;
this.supportedTypes = opts.supportedTypes;
this.logger = opts.logger;
this.getKibanaUuid = opts.getKibanaUuid;
this.fetchAvailableTasks = this.fetchAvailableTasks.bind(this);
}
@ -96,54 +114,108 @@ export class TaskStore {
}
/**
* Initializes the store, ensuring the task manager index is created and up to date.
* Initializes the store, ensuring the task manager index template is created
* and the version is up to date.
*/
public async init() {
if (this._isInitialized) {
throw new Error('TaskStore has already been initialized!');
}
const properties = {
type: { type: 'keyword' },
task: {
properties: {
taskType: { type: 'keyword' },
runAt: { type: 'date' },
interval: { type: 'text' },
attempts: { type: 'integer' },
status: { type: 'keyword' },
params: { type: 'text' },
state: { type: 'text' },
user: { type: 'keyword' },
scope: { type: 'keyword' },
},
},
};
let existingVersion = -Infinity;
const templateName = this.index;
try {
const templateResult = await this.callCluster('indices.putTemplate', {
name: this.index,
body: {
index_patterns: [this.index],
mappings: {
_doc: {
dynamic: 'strict',
properties,
},
},
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
},
},
// check if template exists
const templateCheck = await this.callCluster('indices.getTemplate', {
name: templateName,
filter_path: '*.version',
});
this._isInitialized = true;
return templateResult;
// extract the existing version
const template = templateCheck[templateName] || {};
existingVersion = template.version || 0;
} catch (err) {
throw err;
if (err.statusCode !== 404) {
throw err; // ignore not found
}
}
return;
if (existingVersion > TEMPLATE_VERSION) {
// Do not trample a newer version template
this.logger.warning(
`This Kibana instance defines an older template version (${TEMPLATE_VERSION}) than is currently in Elasticsearch (${existingVersion}). ` +
`Because of the potential for non-backwards compatible changes, this Kibana instance will only be able to claim scheduled tasks with ` +
`"kibana.apiVersion" <= ${API_VERSION} in the task metadata.`
);
return;
} else if (existingVersion === TEMPLATE_VERSION) {
// The latest template is already saved, so just log a debug line.
this.logger.debug(
`Not installing ${this.index} index template: version ${TEMPLATE_VERSION} already exists.`
);
return;
}
// Activate template creation / update
if (existingVersion > 0) {
this.logger.info(
`Upgrading ${
this.index
} index template. Old version: ${existingVersion}, New version: ${TEMPLATE_VERSION}.`
);
} else {
this.logger.info(`Installing ${this.index} index template version: ${TEMPLATE_VERSION}.`);
}
const templateResult = await this.callCluster('indices.putTemplate', {
name: templateName,
body: {
index_patterns: [this.index],
mappings: {
[DOC_TYPE]: {
dynamic: false,
properties: {
type: { type: 'keyword' },
task: {
properties: {
taskType: { type: 'keyword' },
scheduledAt: { type: 'date' },
runAt: { type: 'date' },
interval: { type: 'text' },
attempts: { type: 'integer' },
status: { type: 'keyword' },
params: { type: 'text' },
state: { type: 'text' },
user: { type: 'keyword' },
scope: { type: 'keyword' },
},
},
kibana: {
properties: {
apiVersion: { type: 'integer' }, // 1, 2, 3, etc
uuid: { type: 'keyword' }, //
version: { type: 'integer' }, // 7000099, etc
},
},
},
},
},
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
},
version: TEMPLATE_VERSION,
},
});
this._isInitialized = true;
this.logger.info(
`Installed ${
this.index
} index template: version ${TEMPLATE_VERSION} (API version ${API_VERSION})`
);
return templateResult;
}
get isInitialized() {
@ -168,7 +240,7 @@ export class TaskStore {
);
}
const { id, ...body } = rawSource(taskInstance);
const { id, ...body } = rawSource(taskInstance, this);
const result = await this.callCluster('index', {
id,
body,
@ -184,6 +256,7 @@ export class TaskStore {
version: result._version,
attempts: 0,
status: task.status,
scheduledAt: task.scheduledAt,
runAt: task.runAt,
state: taskInstance.state || {},
};
@ -222,6 +295,7 @@ export class TaskStore {
{ terms: { 'task.taskType': this.supportedTypes } },
{ range: { 'task.attempts': { lte: this.maxAttempts } } },
{ range: { 'task.runAt': { lte: 'now' } } },
{ range: { 'kibana.apiVersion': { lte: API_VERSION } } },
],
},
},
@ -241,7 +315,7 @@ export class TaskStore {
* @returns {Promise<TaskDoc>}
*/
public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
const rawDoc = taskDocToRaw(doc, this.index);
const rawDoc = taskDocToRaw(doc, this);
const { _version } = await this.callCluster('update', {
body: {
@ -326,13 +400,14 @@ function paginatableSort(sort: any[] = []) {
return [...sort, sortById];
}
function rawSource(doc: TaskInstance) {
function rawSource(doc: TaskInstance, store: TaskStore) {
const { id, ...taskFields } = doc;
const source = {
...taskFields,
params: JSON.stringify(doc.params || {}),
state: JSON.stringify(doc.state || {}),
attempts: (doc as ConcreteTaskInstance).attempts || 0,
scheduledAt: doc.scheduledAt || new Date(),
runAt: doc.runAt || new Date(),
status: (doc as ConcreteTaskInstance).status || 'idle',
};
@ -345,16 +420,21 @@ function rawSource(doc: TaskInstance) {
id,
type: 'task',
task: source,
kibana: {
uuid: store.getKibanaUuid(), // needs to be pulled live
version: TEMPLATE_VERSION,
apiVersion: API_VERSION,
},
};
}
function taskDocToRaw(doc: ConcreteTaskInstance, index: string): RawTaskDoc {
const { type, task } = rawSource(doc);
function taskDocToRaw(doc: ConcreteTaskInstance, store: TaskStore): RawTaskDoc {
const { type, task, kibana } = rawSource(doc, store);
return {
_id: doc.id,
_index: index,
_source: { type, task },
_index: store.index,
_source: { type, task, kibana },
_type: DOC_TYPE,
_version: doc.version,
};