Telemetry for Visualizations by type (#28793) (#29625)

* task runner and usage collector for visualizations by type

* type is always just "visualization"

* drop the I- prefix for interfaces

* bug fixes

* ts fix

* comment perfection

* just usage.

* const for task numworkers

* use mapValues

* get next midnight module

* move to oss_telemtry

* test fix

* errMessage.includes(NotInitialized)
This commit is contained in:
Tim Sullivan 2019-01-31 10:53:20 -07:00 committed by GitHub
parent 33e6b48e41
commit df19d7e80d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 612 additions and 0 deletions

View file

@ -35,6 +35,7 @@ import { remoteClusters } from './plugins/remote_clusters';
import { crossClusterReplication } from './plugins/cross_cluster_replication';
import { upgradeAssistant } from './plugins/upgrade_assistant';
import { uptime } from './plugins/uptime';
import { ossTelemetry } from './plugins/oss_telemetry';
module.exports = function (kibana) {
return [
@ -69,5 +70,6 @@ module.exports = function (kibana) {
crossClusterReplication(kibana),
upgradeAssistant(kibana),
uptime(kibana),
ossTelemetry(kibana),
];
};

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export const PLUGIN_ID = 'oss_telemetry'; // prefix used for registering properties with services from this plugin
export const VIS_TELEMETRY_TASK = 'vis_telemetry'; // suffix for the _id of our task instance, which must be `get`-able
export const VIS_USAGE_TYPE = 'visualization_types'; // suffix for the properties of data registered with the usage service
export const VIS_TELEMETRY_TASK_NUM_WORKERS = 10; // by default it's 100% their workers. Users can scale up and set task manager's numWorkers higher for other tasks to be able to run concurrently in a single Kibana instance with this one

66
x-pack/plugins/oss_telemetry/index.d.ts vendored Normal file
View file

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export interface VisState {
type: string;
}
export interface Visualization {
visState: string;
}
export interface SavedObjectDoc {
_id: string;
_source: {
visualization: Visualization;
type: string;
};
}
export interface ESQueryResponse {
hits: {
hits: SavedObjectDoc[];
};
}
export interface TaskInstance {
state: {
runs: number;
stats: any;
};
error?: any;
}
export interface HapiServer {
taskManager: {
registerTaskDefinitions: (opts: any) => void;
schedule: (opts: any) => Promise<void>;
fetch: (
opts: any
) => Promise<{
docs: TaskInstance[];
}>;
};
plugins: {
xpack_main: any;
elasticsearch: {
getCluster: (
cluster: string
) => {
callWithInternalUser: () => Promise<ESQueryResponse>;
};
};
};
usage: {
collectorSet: {
register: (collector: any) => void;
makeUsageCollector: (collectorOpts: any) => void;
};
};
config: () => {
get: (prop: string) => any;
};
}

View file

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { registerCollectors } from './server/lib/collectors';
import { registerTasks, scheduleTasks } from './server/lib/tasks';
import { PLUGIN_ID } from './constants';
export const ossTelemetry = (kibana) => {
return new kibana.Plugin({
id: PLUGIN_ID,
require: ['elasticsearch', 'xpack_main', 'task_manager'],
init(server) {
registerCollectors(server);
registerTasks(server);
scheduleTasks(server);
}
});
};

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { HapiServer } from '../../../';
import { registerVisualizationsCollector } from './visualizations/register_usage_collector';
export function registerCollectors(server: HapiServer) {
registerVisualizationsCollector(server);
}

View file

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import sinon from 'sinon';
import { HapiServer } from '../../../../';
import {
getMockCallWithInternal,
getMockKbnServer,
getMockTaskFetch,
} from '../../../../test_utils';
import { getUsageCollector } from './get_usage_collector';
describe('getVisualizationsCollector#fetch', () => {
let mockKbnServer: HapiServer;
beforeEach(() => {
mockKbnServer = getMockKbnServer(getMockCallWithInternal(), getMockTaskFetch());
});
test('can return empty stats', async () => {
const { type, fetch } = getUsageCollector(mockKbnServer);
expect(type).toBe('visualization_types');
const fetchResult = await fetch();
expect(fetchResult).toEqual({});
});
test('provides known stats', async () => {
const mockTaskFetch = getMockTaskFetch([
{
state: {
runs: 1,
stats: { comic_books: { total: 16, max: 12, min: 2, avg: 6 } },
},
},
]);
mockKbnServer = getMockKbnServer(getMockCallWithInternal(), mockTaskFetch);
const { type, fetch } = getUsageCollector(mockKbnServer);
expect(type).toBe('visualization_types');
const fetchResult = await fetch();
expect(fetchResult).toEqual({ comic_books: { avg: 6, max: 12, min: 2, total: 16 } });
});
describe('Error handling', () => {
test('Silently handles Task Manager NotInitialized', async () => {
const mockTaskFetch = sinon.stub();
mockTaskFetch.rejects(
new Error('NotInitialized taskManager is still waiting for plugins to load')
);
mockKbnServer = getMockKbnServer(getMockCallWithInternal(), mockTaskFetch);
const { fetch } = getUsageCollector(mockKbnServer);
await expect(fetch()).resolves.toBe(undefined);
});
// In real life, the CollectorSet calls fetch and handles errors
test('defers the errors', async () => {
const mockTaskFetch = sinon.stub();
mockTaskFetch.rejects(new Error('BOOM'));
mockKbnServer = getMockKbnServer(getMockCallWithInternal(), mockTaskFetch);
const { fetch } = getUsageCollector(mockKbnServer);
await expect(fetch()).rejects.toMatchObject(new Error('BOOM'));
});
});
});

View file

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { get } from 'lodash';
import { HapiServer } from '../../../../';
import { PLUGIN_ID, VIS_TELEMETRY_TASK, VIS_USAGE_TYPE } from '../../../../constants';
export function getUsageCollector(server: HapiServer) {
const { taskManager } = server;
return {
type: VIS_USAGE_TYPE,
fetch: async () => {
let docs;
try {
({ docs } = await taskManager.fetch({
query: { bool: { filter: { term: { _id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}` } } } },
}));
} catch (err) {
const errMessage = err && err.message ? err.message : err.toString();
/*
* The usage service WILL to try to fetch from this collector before the task manager has been initialized, because the task manager
* has to wait for all plugins to initialize first.
* It's fine to ignore it as next time around it will be initialized (or it will throw a different type of error)
*/
if (errMessage.includes('NotInitialized')) {
docs = {};
} else {
throw err;
}
}
// get the accumulated state from the recurring task
return get(docs, '[0].state.stats');
},
};
}

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { HapiServer } from '../../../../';
import { getUsageCollector } from './get_usage_collector';
export function registerVisualizationsCollector(server: HapiServer): void {
const { usage } = server;
const collector = usage.collectorSet.makeUsageCollector(getUsageCollector(server));
usage.collectorSet.register(collector);
}

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { getNextMidnight } from './get_next_midnight';
describe('getNextMidnight', () => {
test('Returns the next time and date of midnight as an iso string', () => {
const nextMidnightMoment = moment()
.add(1, 'days')
.startOf('day')
.toISOString();
expect(getNextMidnight()).toEqual(nextMidnightMoment);
});
});

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export function getNextMidnight() {
const nextMidnight = new Date();
nextMidnight.setHours(0, 0, 0, 0);
nextMidnight.setDate(nextMidnight.getDate() + 1);
return nextMidnight.toISOString();
}

View file

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { HapiServer } from '../../../';
import { PLUGIN_ID, VIS_TELEMETRY_TASK, VIS_TELEMETRY_TASK_NUM_WORKERS } from '../../../constants';
import { visualizationsTaskRunner } from './visualizations/task_runner';
export function registerTasks(server: HapiServer) {
const { taskManager } = server;
taskManager.registerTaskDefinitions({
[VIS_TELEMETRY_TASK]: {
title: 'X-Pack telemetry calculator for Visualizations',
type: VIS_TELEMETRY_TASK,
numWorkers: VIS_TELEMETRY_TASK_NUM_WORKERS, // by default it's 100% their workers
createTaskRunner({ taskInstance, kbnServer }: { kbnServer: any; taskInstance: any }) {
return {
run: visualizationsTaskRunner(taskInstance, kbnServer),
};
},
},
});
}
export function scheduleTasks(server: HapiServer) {
const { taskManager } = server;
const { kbnServer } = server.plugins.xpack_main.status.plugin;
kbnServer.afterPluginsInit(() => {
taskManager.schedule({
id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}`,
taskType: VIS_TELEMETRY_TASK,
state: { stats: {}, runs: 0 },
});
});
}

View file

@ -0,0 +1,144 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { HapiServer, TaskInstance } from '../../../../';
import {
getMockCallWithInternal,
getMockKbnServer,
getMockTaskInstance,
} from '../../../../test_utils';
import { visualizationsTaskRunner } from './task_runner';
describe('visualizationsTaskRunner', () => {
let mockTaskInstance: TaskInstance;
let mockKbnServer: HapiServer;
beforeEach(() => {
mockTaskInstance = getMockTaskInstance();
mockKbnServer = getMockKbnServer();
});
describe('Error handling', () => {
test('catches its own errors', async () => {
const mockCallWithInternal = () => Promise.reject(new Error('Things did not go well!'));
mockKbnServer = getMockKbnServer(mockCallWithInternal);
const runner = visualizationsTaskRunner(mockTaskInstance, { server: mockKbnServer });
const result = await runner();
expect(result).toMatchObject({
error: 'Things did not go well!',
state: {
runs: 1,
stats: undefined,
},
});
});
});
test('Summarizes visualization response data', async () => {
const getNextMidnight = () =>
moment()
.add(1, 'days')
.startOf('day')
.toISOString();
const runner = visualizationsTaskRunner(mockTaskInstance, { server: mockKbnServer });
const result = await runner();
expect(result).toMatchObject({
error: undefined,
runAt: getNextMidnight(),
state: {
runs: 1,
stats: {
shell_beads: {
spaces_avg: 1,
spaces_max: 1,
spaces_min: 1,
total: 1,
},
},
},
});
});
test('Summarizes visualization response data per Space', async () => {
const mockCallWithInternal = getMockCallWithInternal([
// default space
{
_id: 'visualization:coolviz-123',
_source: {
type: 'visualization',
visualization: { visState: '{"type": "cave_painting"}' },
},
},
{
_id: 'visualization:coolviz-456',
_source: {
type: 'visualization',
visualization: { visState: '{"type": "printing_press"}' },
},
},
{
_id: 'meat:visualization:coolviz-789',
_source: { type: 'visualization', visualization: { visState: '{"type": "floppy_disk"}' } },
},
// meat space
{
_id: 'meat:visualization:coolviz-789',
_source: {
type: 'visualization',
visualization: { visState: '{"type": "cave_painting"}' },
},
},
{
_id: 'meat:visualization:coolviz-789',
_source: { type: 'visualization', visualization: { visState: '{"type": "cuneiform"}' } },
},
{
_id: 'meat:visualization:coolviz-789',
_source: { type: 'visualization', visualization: { visState: '{"type": "cuneiform"}' } },
},
{
_id: 'meat:visualization:coolviz-789',
_source: { type: 'visualization', visualization: { visState: '{"type": "floppy_disk"}' } },
},
// cyber space
{
_id: 'cyber:visualization:coolviz-789',
_source: { type: 'visualization', visualization: { visState: '{"type": "floppy_disk"}' } },
},
{
_id: 'cyber:visualization:coolviz-789',
_source: { type: 'visualization', visualization: { visState: '{"type": "floppy_disk"}' } },
},
{
_id: 'cyber:visualization:coolviz-123',
_source: {
type: 'visualization',
visualization: { visState: '{"type": "cave_painting"}' },
},
},
]);
mockKbnServer = getMockKbnServer(mockCallWithInternal);
const runner = visualizationsTaskRunner(mockTaskInstance, { server: mockKbnServer });
const result = await runner();
expect(result).toMatchObject({
error: undefined,
state: {
runs: 1,
stats: {
cave_painting: { total: 3, spaces_min: 1, spaces_max: 1, spaces_avg: 1 },
printing_press: { total: 1, spaces_min: 1, spaces_max: 1, spaces_avg: 1 },
cuneiform: { total: 2, spaces_min: 2, spaces_max: 2, spaces_avg: 2 },
floppy_disk: { total: 4, spaces_min: 2, spaces_max: 2, spaces_avg: 2 },
},
},
});
});
});

View file

@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import _, { countBy, groupBy, mapValues } from 'lodash';
import {
ESQueryResponse,
HapiServer,
SavedObjectDoc,
TaskInstance,
VisState,
Visualization,
} from '../../../../';
import { getNextMidnight } from '../../get_next_midnight';
interface VisSummary {
type: string;
space: string;
}
/*
* Parse the response data into telemetry payload
*/
async function getStats(callCluster: (method: string, params: any) => Promise<any>, index: string) {
const searchParams = {
size: 10000, // elasticsearch index.max_result_window default value
index,
ignoreUnavailable: true,
filterPath: ['hits.hits._id', 'hits.hits._source.visualization'],
body: {
query: {
bool: { filter: { term: { type: 'visualization' } } },
},
},
};
const esResponse: ESQueryResponse = await callCluster('search', searchParams);
const size = _.get(esResponse, 'hits.hits.length');
if (size < 1) {
return;
}
// `map` to get the raw types
const visSummaries: VisSummary[] = esResponse.hits.hits.map((hit: SavedObjectDoc) => {
const spacePhrases: string[] = hit._id.split(':');
const space = spacePhrases.length === 3 ? spacePhrases[0] : 'default'; // if in a custom space, the format of a saved object ID is space:type:id
const visualization: Visualization = _.get(hit, '_source.visualization', { visState: '{}' });
const visState: VisState = JSON.parse(visualization.visState);
return {
type: visState.type || '_na_',
space,
};
});
// organize stats per type
const visTypes = groupBy(visSummaries, 'type');
// get the final result
return mapValues(visTypes, curr => {
const total = curr.length;
const spacesBreakdown = countBy(curr, 'space');
const spaceCounts: number[] = _.values(spacesBreakdown);
return {
total,
spaces_min: _.min(spaceCounts),
spaces_max: _.max(spaceCounts),
spaces_avg: total / spaceCounts.length,
};
});
}
export function visualizationsTaskRunner(
taskInstance: TaskInstance,
kbnServer: { server: HapiServer }
) {
const { server } = kbnServer;
const { callWithInternalUser: callCluster } = server.plugins.elasticsearch.getCluster('data');
const config = server.config();
const index = config.get('kibana.index').toString(); // cast to string for TypeScript
return async () => {
let stats;
let error;
try {
stats = await getStats(callCluster, index);
} catch (err) {
if (err.constructor === Error) {
error = err.message;
} else {
error = err;
}
}
return {
runAt: getNextMidnight(),
state: {
runs: taskInstance.state.runs + 1,
stats,
},
error,
};
};
}

View file

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { ESQueryResponse, HapiServer, SavedObjectDoc, TaskInstance } from '../';
export const getMockTaskInstance = (): TaskInstance => ({ state: { runs: 0, stats: {} } });
const defaultMockSavedObjects = [
{
_id: 'visualization:coolviz-123',
_source: {
type: 'visualization',
visualization: { visState: '{"type": "shell_beads"}' },
},
},
];
const defaultMockTaskDocs = [getMockTaskInstance()];
export const getMockCallWithInternal = (hits: SavedObjectDoc[] = defaultMockSavedObjects) => {
return (): Promise<ESQueryResponse> => {
return Promise.resolve({ hits: { hits } });
};
};
export const getMockTaskFetch = (docs: TaskInstance[] = defaultMockTaskDocs) => {
return () => Promise.resolve({ docs });
};
export const getMockKbnServer = (
mockCallWithInternal = getMockCallWithInternal(),
mockTaskFetch = getMockTaskFetch()
): HapiServer => ({
taskManager: {
registerTaskDefinitions: (opts: any) => undefined,
schedule: (opts: any) => Promise.resolve(),
fetch: mockTaskFetch,
},
plugins: {
elasticsearch: {
getCluster: (cluster: string) => ({
callWithInternalUser: mockCallWithInternal,
}),
},
xpack_main: {},
},
usage: {
collectorSet: {
makeUsageCollector: () => '',
register: () => undefined,
},
},
config: () => ({ get: () => '' }),
});