Make task manager index configurable again (#42394) (#42706)

* Initial work

* Fix type check

* Accept core API changes

* Fix broken tests

* Destructure index pattern
This commit is contained in:
Mike Côté 2019-08-06 13:01:58 -04:00 committed by GitHub
parent 4914cecf1a
commit 07ae78f0db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 134 additions and 62 deletions

View file

@ -0,0 +1,22 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-server](./kibana-plugin-server.md) &gt; [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) &gt; [getConvertToAliasScript](./kibana-plugin-server.savedobjectsschema.getconverttoaliasscript.md)
## SavedObjectsSchema.getConvertToAliasScript() method
<b>Signature:</b>
```typescript
getConvertToAliasScript(type: string): string | undefined;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| type | <code>string</code> | |
<b>Returns:</b>
`string | undefined`

View file

@ -7,13 +7,14 @@
<b>Signature:</b>
```typescript
getIndexForType(type: string): string | undefined;
getIndexForType(config: Config, type: string): string | undefined;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| config | <code>Config</code> | |
| type | <code>string</code> | |
<b>Returns:</b>

View file

@ -20,7 +20,8 @@ export declare class SavedObjectsSchema
| Method | Modifiers | Description |
| --- | --- | --- |
| [getIndexForType(type)](./kibana-plugin-server.savedobjectsschema.getindexfortype.md) | | |
| [getConvertToAliasScript(type)](./kibana-plugin-server.savedobjectsschema.getconverttoaliasscript.md) | | |
| [getIndexForType(config, type)](./kibana-plugin-server.savedobjectsschema.getindexfortype.md) | | |
| [isHiddenType(type)](./kibana-plugin-server.savedobjectsschema.ishiddentype.md) | | |
| [isNamespaceAgnostic(type)](./kibana-plugin-server.savedobjectsschema.isnamespaceagnostic.md) | | |

View file

@ -18,16 +18,19 @@
*/
import { createIndexMap } from './build_index_map';
import { ObjectToConfigAdapter } from '../../../config';
import { SavedObjectsSchema } from '../../schema';
test('mappings without index pattern goes to default index', () => {
const result = createIndexMap(
'.kibana',
{
const result = createIndexMap({
config: new ObjectToConfigAdapter({}),
kibanaIndexName: '.kibana',
schema: new SavedObjectsSchema({
type1: {
isNamespaceAgnostic: false,
},
},
{
}),
indexMap: {
type1: {
properties: {
field1: {
@ -35,8 +38,8 @@ test('mappings without index pattern goes to default index', () => {
},
},
},
}
);
},
});
expect(result).toEqual({
'.kibana': {
typeMappings: {
@ -53,15 +56,16 @@ test('mappings without index pattern goes to default index', () => {
});
test(`mappings with custom index pattern doesn't go to default index`, () => {
const result = createIndexMap(
'.kibana',
{
const result = createIndexMap({
config: new ObjectToConfigAdapter({}),
kibanaIndexName: '.kibana',
schema: new SavedObjectsSchema({
type1: {
isNamespaceAgnostic: false,
indexPattern: '.other_kibana',
},
},
{
}),
indexMap: {
type1: {
properties: {
field1: {
@ -69,8 +73,8 @@ test(`mappings with custom index pattern doesn't go to default index`, () => {
},
},
},
}
);
},
});
expect(result).toEqual({
'.other_kibana': {
typeMappings: {
@ -87,16 +91,17 @@ test(`mappings with custom index pattern doesn't go to default index`, () => {
});
test('creating a script gets added to the index pattern', () => {
const result = createIndexMap(
'.kibana',
{
const result = createIndexMap({
config: new ObjectToConfigAdapter({}),
kibanaIndexName: '.kibana',
schema: new SavedObjectsSchema({
type1: {
isNamespaceAgnostic: false,
indexPattern: '.other_kibana',
convertToAliasScript: `ctx._id = ctx._source.type + ':' + ctx._id`,
},
},
{
}),
indexMap: {
type1: {
properties: {
field1: {
@ -104,8 +109,8 @@ test('creating a script gets added to the index pattern', () => {
},
},
},
}
);
},
});
expect(result).toEqual({
'.other_kibana': {
script: `ctx._id = ctx._source.type + ':' + ctx._id`,
@ -124,7 +129,7 @@ test('creating a script gets added to the index pattern', () => {
test('throws when two scripts are defined for an index pattern', () => {
const defaultIndex = '.kibana';
const savedObjectSchemas = {
const schema = new SavedObjectsSchema({
type1: {
isNamespaceAgnostic: false,
convertToAliasScript: `ctx._id = ctx._source.type + ':' + ctx._id`,
@ -133,7 +138,7 @@ test('throws when two scripts are defined for an index pattern', () => {
isNamespaceAgnostic: false,
convertToAliasScript: `ctx._id = ctx._source.type + ':' + ctx._id`,
},
};
});
const indexMap = {
type1: {
properties: {
@ -151,7 +156,12 @@ test('throws when two scripts are defined for an index pattern', () => {
},
};
expect(() =>
createIndexMap(defaultIndex, savedObjectSchemas, indexMap)
createIndexMap({
config: new ObjectToConfigAdapter({}),
kibanaIndexName: defaultIndex,
schema,
indexMap,
})
).toThrowErrorMatchingInlineSnapshot(
`"convertToAliasScript has been defined more than once for index pattern \\".kibana\\""`
);

View file

@ -17,8 +17,16 @@
* under the License.
*/
import { Config } from '../../../config';
import { MappingProperties } from '../../mappings';
import { SavedObjectsSchemaDefinition } from '../../schema';
import { SavedObjectsSchema } from '../../schema';
export interface CreateIndexMapOptions {
config: Config;
kibanaIndexName: string;
schema: SavedObjectsSchema;
indexMap: MappingProperties;
}
export interface IndexMap {
[index: string]: {
@ -30,16 +38,17 @@ export interface IndexMap {
/*
* This file contains logic to convert savedObjectSchemas into a dictonary of indexes and documents
*/
export function createIndexMap(
defaultIndex: string,
savedObjectSchemas: SavedObjectsSchemaDefinition,
indexMap: MappingProperties
) {
export function createIndexMap({
config,
kibanaIndexName,
schema,
indexMap,
}: CreateIndexMapOptions) {
const map: IndexMap = {};
Object.keys(indexMap).forEach(type => {
const schema = savedObjectSchemas[type] || {};
const script = schema.convertToAliasScript;
const indexPattern = schema.indexPattern || defaultIndex;
const script = schema.getConvertToAliasScript(type);
// Defaults to kibanaIndexName if indexPattern isn't defined
const indexPattern = schema.getIndexForType(config, type) || kibanaIndexName;
if (!map.hasOwnProperty(indexPattern as string)) {
map[indexPattern] = { typeMappings: {} };
}

View file

@ -31,6 +31,7 @@ import { docValidator } from '../../validation';
import { buildActiveMappings, CallCluster, IndexMigrator, LogFn } from '../core';
import { DocumentMigrator, VersionedTransformer } from '../core/document_migrator';
import { createIndexMap } from '../core/build_index_map';
import { Config } from '../../../config';
export interface KbnServer {
server: Server;
version: string;
@ -92,13 +93,14 @@ export class KibanaMigrator {
// Wait until elasticsearch is green...
await server.plugins.elasticsearch.waitUntilReady();
const config = server.config();
const config = server.config() as Config;
const kibanaIndexName = config.get('kibana.index');
const indexMap = createIndexMap(
const indexMap = createIndexMap({
config,
kibanaIndexName,
this.kbnServer.uiExports.savedObjectSchemas,
this.mappingProperties
);
indexMap: this.mappingProperties,
schema: this.schema,
});
const migrators = Object.keys(indexMap).map(index => {
return new IndexMigrator({
@ -130,6 +132,7 @@ export class KibanaMigrator {
private mappingProperties: MappingProperties;
private log: LogFn;
private serializer: SavedObjectsSerializer;
private readonly schema: SavedObjectsSchema;
/**
* Creates an instance of KibanaMigrator.
@ -141,9 +144,8 @@ export class KibanaMigrator {
constructor({ kbnServer }: { kbnServer: KbnServer }) {
this.kbnServer = kbnServer;
this.serializer = new SavedObjectsSerializer(
new SavedObjectsSchema(kbnServer.uiExports.savedObjectSchemas)
);
this.schema = new SavedObjectsSchema(kbnServer.uiExports.savedObjectSchemas);
this.serializer = new SavedObjectsSerializer(this.schema);
this.mappingProperties = mergeProperties(kbnServer.uiExports.savedObjectMappings || []);

View file

@ -25,6 +25,7 @@ const createSchemaMock = () => {
getIndexForType: jest.fn().mockReturnValue('.kibana-test'),
isHiddenType: jest.fn().mockReturnValue(false),
isNamespaceAgnostic: jest.fn((type: string) => type === 'global'),
getConvertToAliasScript: jest.fn().mockReturnValue(undefined),
};
return mocked;
};

View file

@ -16,10 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Config } from '../../config';
interface SavedObjectsSchemaTypeDefinition {
isNamespaceAgnostic: boolean;
hidden?: boolean;
indexPattern?: string;
indexPattern?: ((config: Config) => string) | string;
convertToAliasScript?: string;
}
@ -41,14 +44,21 @@ export class SavedObjectsSchema {
return false;
}
public getIndexForType(type: string): string | undefined {
public getIndexForType(config: Config, type: string): string | undefined {
if (this.definition != null && this.definition.hasOwnProperty(type)) {
return this.definition[type].indexPattern;
const { indexPattern } = this.definition[type];
return typeof indexPattern === 'function' ? indexPattern(config) : indexPattern;
} else {
return undefined;
}
}
public getConvertToAliasScript(type: string): string | undefined {
if (this.definition != null && this.definition.hasOwnProperty(type)) {
return this.definition[type].convertToAliasScript;
}
}
public isNamespaceAgnostic(type: string) {
// if no plugins have registered a uiExports.savedObjectSchemas,
// this.schema will be undefined, and no types are namespace agnostic

View file

@ -27,6 +27,7 @@ import { SavedObjectsErrorHelpers } from './errors';
import { decodeRequestVersion, encodeVersion, encodeHitVersion } from '../../version';
import { SavedObjectsSchema } from '../../schema';
import { KibanaMigrator } from '../../migrations';
import { Config } from '../../../config';
import { SavedObjectsSerializer, SanitizedSavedObjectDoc, RawDoc } from '../../serialization';
import {
SavedObject,
@ -64,6 +65,7 @@ const isLeft = <L, R>(either: Either<L, R>): either is Left<L> => {
export interface SavedObjectsRepositoryOptions {
index: string;
config: Config;
mappings: IndexMapping;
callCluster: CallCluster;
schema: SavedObjectsSchema;
@ -80,6 +82,7 @@ export interface IncrementCounterOptions extends SavedObjectsBaseOptions {
export class SavedObjectsRepository {
private _migrator: KibanaMigrator;
private _index: string;
private _config: Config;
private _mappings: IndexMapping;
private _schema: SavedObjectsSchema;
private _allowedTypes: string[];
@ -90,6 +93,7 @@ export class SavedObjectsRepository {
constructor(options: SavedObjectsRepositoryOptions) {
const {
index,
config,
mappings,
callCluster,
schema,
@ -108,6 +112,7 @@ export class SavedObjectsRepository {
// to returning them.
this._migrator = migrator;
this._index = index;
this._config = config;
this._mappings = mappings;
this._schema = schema;
if (allowedTypes.length === 0) {
@ -741,7 +746,7 @@ export class SavedObjectsRepository {
* @param type - the type
*/
private getIndexForType(type: string) {
return this._schema.getIndexForType(type) || this._index;
return this._schema.getIndexForType(this._config, type) || this._index;
}
/**
@ -753,7 +758,7 @@ export class SavedObjectsRepository {
*/
private getIndicesForTypes(types: string[]) {
const unique = (array: string[]) => [...new Set(array)];
return unique(types.map(t => this._schema.getIndexForType(t) || this._index));
return unique(types.map(t => this._schema.getIndexForType(this._config, t) || this._index));
}
private _getCurrentTime() {

View file

@ -900,7 +900,9 @@ export class SavedObjectsSchema {
// Warning: (ae-forgotten-export) The symbol "SavedObjectsSchemaDefinition" needs to be exported by the entry point index.d.ts
constructor(schemaDefinition?: SavedObjectsSchemaDefinition);
// (undocumented)
getIndexForType(type: string): string | undefined;
getConvertToAliasScript(type: string): string | undefined;
// (undocumented)
getIndexForType(config: Config, type: string): string | undefined;
// (undocumented)
isHiddenType(type: string): boolean;
// (undocumented)

View file

@ -83,6 +83,7 @@ export async function migrateKibanaIndex({ client, log, kibanaPluginIds }) {
'migrations.scrollDuration': '5m',
'migrations.batchSize': 100,
'migrations.pollInterval': 100,
'xpack.task_manager.index': '.kibana_task_manager',
};
const ready = async () => undefined;
const elasticsearch = {

View file

@ -114,9 +114,11 @@ export function savedObjectsMixin(kbnServer, server) {
});
const combinedTypes = visibleTypes.concat(extraTypes);
const allowedTypes = [...new Set(combinedTypes)];
const config = server.config();
return new SavedObjectsRepository({
index: server.config().get('kibana.index'),
index: config.get('kibana.index'),
config,
migrator,
mappings,
schema,

View file

@ -1,7 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export const TASK_MANAGER_INDEX = '.kibana_task_manager';

View file

@ -8,7 +8,6 @@ import { SavedObjectsSerializer, SavedObjectsSchema } from '../../../../src/core
import { TaskManager } from './task_manager';
import mappings from './mappings.json';
import { migrations } from './migrations';
import { TASK_MANAGER_INDEX } from './constants';
export function taskManager(kibana) {
return new kibana.Plugin({
@ -26,6 +25,9 @@ export function taskManager(kibana) {
.description('How often, in milliseconds, the task manager will look for more work.')
.min(1000)
.default(3000),
index: Joi.string()
.description('The name of the index used to store task information.')
.default('.kibana_task_manager'),
max_workers: Joi.number()
.description('The maximum number of tasks that this Kibana instance will run simultaneously.')
.min(1) // disable the task manager rather than trying to specify it with 0 workers
@ -61,8 +63,10 @@ export function taskManager(kibana) {
task: {
hidden: true,
isNamespaceAgnostic: true,
indexPattern: TASK_MANAGER_INDEX,
convertToAliasScript: `ctx._id = ctx._source.type + ':' + ctx._id`,
indexPattern(config) {
return config.get('xpack.task_manager.index');
},
},
},
},

View file

@ -68,6 +68,7 @@ export class TaskManager {
serializer: opts.serializer,
savedObjectsRepository: opts.savedObjectsRepository,
callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser,
index: opts.config.get('xpack.task_manager.index'),
maxAttempts: opts.config.get('xpack.task_manager.max_attempts'),
definitions: this.definitions,
});

View file

@ -62,6 +62,7 @@ describe('TaskStore', () => {
})
);
const store = new TaskStore({
index: 'tasky',
serializer,
callCluster,
maxAttempts: 2,
@ -160,6 +161,7 @@ describe('TaskStore', () => {
async function testFetch(opts?: FetchOpts, hits: any[] = []) {
const callCluster = sinon.spy(async () => ({ hits: { hits } }));
const store = new TaskStore({
index: 'tasky',
serializer,
callCluster,
maxAttempts: 2,
@ -181,7 +183,7 @@ describe('TaskStore', () => {
test('empty call filters by type, sorts by runAt and id', async () => {
const { args } = await testFetch();
expect(args).toMatchObject({
index: '.kibana_task_manager',
index: 'tasky',
body: {
sort: [{ 'task.runAt': 'asc' }, { _id: 'desc' }],
query: { term: { type: 'task' } },
@ -350,6 +352,7 @@ describe('TaskStore', () => {
test('it returns normally with no tasks when the index does not exist.', async () => {
const callCluster = sinon.spy(async () => ({ hits: { hits: [] } }));
const store = new TaskStore({
index: 'tasky',
serializer,
callCluster,
definitions: taskDefinitions,
@ -580,6 +583,7 @@ describe('TaskStore', () => {
);
const store = new TaskStore({
index: 'tasky',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
@ -626,6 +630,7 @@ describe('TaskStore', () => {
const id = `id-${_.random(1, 20)}`;
const callCluster = jest.fn();
const store = new TaskStore({
index: 'tasky',
serializer,
callCluster,
maxAttempts: 2,

View file

@ -23,10 +23,10 @@ import {
TaskDictionary,
TaskInstance,
} from './task';
import { TASK_MANAGER_INDEX } from './constants';
export interface StoreOpts {
callCluster: ElasticJs;
index: string;
maxAttempts: number;
definitions: TaskDictionary<SanitizedTaskDefinition>;
savedObjectsRepository: SavedObjectsClientContract;
@ -50,6 +50,7 @@ export interface FetchResult {
*/
export class TaskStore {
public readonly maxAttempts: number;
public readonly index: string;
private callCluster: ElasticJs;
private definitions: TaskDictionary<SanitizedTaskDefinition>;
private savedObjectsRepository: SavedObjectsClientContract;
@ -59,6 +60,7 @@ export class TaskStore {
* Constructs a new TaskStore.
* @param {StoreOpts} opts
* @prop {CallCluster} callCluster - The elastic search connection
* @prop {string} index - The name of the task manager index
* @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned
* @prop {TaskDefinition} definition - The definition of the task being run
* @prop {serializer} - The saved object serializer
@ -66,6 +68,7 @@ export class TaskStore {
*/
constructor(opts: StoreOpts) {
this.callCluster = opts.callCluster;
this.index = opts.index;
this.maxAttempts = opts.maxAttempts;
this.definitions = opts.definitions;
this.serializer = opts.serializer;
@ -229,7 +232,7 @@ export class TaskStore {
: queryOnlyTasks;
const result = await this.callCluster('search', {
index: TASK_MANAGER_INDEX,
index: this.index,
ignoreUnavailable: true,
body: {
...opts,