[CM] Event Stream service (#153211)

## Summary

This PR implements the Event Stream service for Content Management.

For high-level overview see:

- [Event Stream technical
summary](https://docs.google.com/document/d/1nyMhb0p4gNV43OVF6cLJkxhMf2V4V1BIjPsnACxe_t0/edit#heading=h.typ7x7sxmeye)
(a bit old, but still good as general overview read)

Implementation details in this PR:

- This PR introduces the `EventStreamService` high-level class, which is
the public interface to the Event Stream, holds any necessary state, and
follows plugin life-cycle methods.
- On a lower level the actual event storage is defined in the
`EventStreamClient` interface.
- There are two `EventStreamClient` implementations:
- `EsEventStreamClient` is the production implementation, which stores
events to Elasticsearch.
- `MemoryEventStreamClient` is used for testing and could be used for
demo purposes.
- The same test suite `testEventStreamClient` is reused for
`EsEventStreamClient` and `MemoryEventStreamClient`, which should help
with verifying that both implements work correctly and the same. For
`EsEventStreamClient` it is executed as Kibana integration test, but for
`MemoryEventStreamClient` it is executed as a Jest test.
- In `EventStreamService` events are buffered for 250ms or up to 100
events before they are flushed to the storage.
- Events are stored in the `.kibana-event-stream` data stream.
- The data stream and index template are create during plugin
initialization "start" life-cycle, similar to how it is done in the
Event Log and in the Reporting index.
- The mappings define a `meta` field, which is currently unused, but
will allow to add more fields in the future without needing to change
the schema of the data stream.
- The mappings define a transaction ID `txId` field, which can be used
to correlate multiple related events together or to store the
transaction ID.
- Events are written to Elasticsearch using the `_bulk` request API.



### Checklist

Delete any items that are not applicable to this PR.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

### For maintainers

- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

---------
Co-authored-by: Aleh Zasypkin <aleh.zasypkin@gmail.com>
Co-authored-by: Anton Dosov <anton.dosov@elastic.co>
This commit is contained in:
Vadim Kibana 2023-04-05 11:35:09 +02:00 committed by GitHub
parent c3e4662080
commit f442a0dd6b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 2460 additions and 27 deletions

View file

@ -720,6 +720,7 @@
"JSONStream": "1.3.5",
"abort-controller": "^3.0.0",
"adm-zip": "^0.5.9",
"ajv": "^8.11.0",
"antlr4ts": "^0.5.0-alpha.3",
"archiver": "^5.3.1",
"async": "^3.2.3",
@ -1297,7 +1298,6 @@
"@yarnpkg/lockfile": "^1.1.0",
"abab": "^2.0.4",
"aggregate-error": "^3.1.0",
"ajv": "^8.11.0",
"antlr4ts-cli": "^0.5.0-alpha.3",
"apidoc-markdown": "^7.2.4",
"argsplit": "^1.0.5",

View file

@ -277,4 +277,33 @@ describe('nodeBuilder', () => {
`);
});
});
describe('range method', () => {
const date = new Date(1679741259769);
const dateString = date.toISOString();
test('formats all range operators', () => {
const operators: Array<'gt' | 'gte' | 'lt' | 'lte'> = ['gt', 'gte', 'lt', 'lte'];
for (const operator of operators) {
const nodes = nodeBuilder.range('foo', operator, dateString);
const query = toElasticsearchQuery(nodes);
expect(query).toMatchObject({
bool: {
minimum_should_match: 1,
should: [
{
range: {
foo: {
[operator]: dateString,
},
},
},
],
},
});
}
});
});
});

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
import type { RangeFilterParams } from '../../filters';
import { KueryNode, nodeTypes } from '../types';
export const nodeBuilder = {
@ -21,4 +22,15 @@ export const nodeBuilder = {
and: (nodes: KueryNode[]): KueryNode => {
return nodes.length > 1 ? nodeTypes.function.buildNode('and', nodes) : nodes[0];
},
range: (
fieldName: string,
operator: keyof Pick<RangeFilterParams, 'gt' | 'gte' | 'lt' | 'lte'>,
value: number | string
) => {
return nodeTypes.function.buildNodeWithArgumentNodes('range', [
nodeTypes.literal.buildNode(fieldName),
operator,
typeof value === 'string' ? nodeTypes.literal.buildNode(value) : value,
]);
},
};

View file

@ -19,7 +19,7 @@ export interface ItemBufferParams<Item> {
* argument which is a list of all buffered items. If `.flush()` is called
* when buffer is empty, `.onflush` is called with empty array.
*/
onFlush: (items: Item[]) => void;
onFlush: (items: Item[]) => void | Promise<void>;
}
/**
@ -60,11 +60,19 @@ export class ItemBuffer<Item> {
}
/**
* Call `.onflush` method and clear buffer.
* Call `.onFlush` method and clear buffer.
*/
public flush() {
this.flushAsync().catch(() => {});
}
/**
* Same as `.flush()` but asynchronous, and returns a promise, which
* rejects if `.onFlush` throws.
*/
public async flushAsync(): Promise<void> {
let list;
[list, this.list] = [this.list, []];
this.params.onFlush(list);
await this.params.onFlush(list);
}
}

View file

@ -41,6 +41,11 @@ export class TimedItemBuffer<Item> extends ItemBuffer<Item> {
super.flush();
}
public async flushAsync() {
clearTimeout(this.timer);
await super.flushAsync();
}
private onTimeout = () => {
this.flush();
};

View file

@ -1,3 +1,22 @@
# Content management
The content management plugin provides functionality to manage content in Kibana.
## Testing
Many parts of the Content Management service are implemented *in-memory*, hence it
is possible to test big chunks of the Content Management plugin using Jest
tests.
### Elasticsearch Integration tests
Some functionality of the Content Management plugin can be tested using *Kibana
Integration Tests*, which execute tests against a real Elasticsearch instance.
Run integrations tests with:
```
yarn test:jest_integration src/plugins/content_management
```

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
module.exports = {
preset: '@kbn/test/jest_integration',
rootDir: '../../..',
roots: ['<rootDir>/src/plugins/content_management'],
};

View file

@ -5,6 +5,7 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { Core } from './core';
import { createMemoryStorage } from './mocks';
@ -31,6 +32,8 @@ import type {
SearchItemError,
} from './event_types';
import { ContentTypeDefinition, StorageContext } from './types';
import { until } from '../event_stream/tests/util';
import { setupEventStreamService } from '../event_stream/tests/setup_event_stream_service';
const logger = loggingSystemMock.createLogger();
@ -48,8 +51,13 @@ const setup = ({ registerFooType = false }: { registerFooType?: boolean } = {})
},
};
const core = new Core({ logger });
const eventStream = setupEventStreamService().service;
const core = new Core({
logger,
eventStream,
});
const coreSetup = core.setup();
const contentDefinition: ContentTypeDefinition = {
id: FOO_CONTENT_ID,
storage: createMemoryStorage(),
@ -76,6 +84,7 @@ const setup = ({ registerFooType = false }: { registerFooType?: boolean } = {})
fooContentCrud,
cleanUp,
eventBus: coreSetup.api.eventBus,
eventStream,
};
};
@ -839,6 +848,41 @@ describe('Content Core', () => {
});
});
});
describe('eventStream', () => {
test('stores "delete" events', async () => {
const { fooContentCrud, ctx, eventStream } = setup({ registerFooType: true });
await fooContentCrud!.create(ctx, { title: 'Hello' }, { id: '1234' });
await fooContentCrud!.delete(ctx, '1234');
const findEvent = async () => {
const tail = await eventStream.tail();
for (const event of tail) {
if (
event.predicate[0] === 'delete' &&
event.object &&
event.object[0] === 'foo' &&
event.object[1] === '1234'
) {
return event;
}
}
return null;
};
await until(async () => !!(await findEvent()), 100);
const event = await findEvent();
expect(event).toMatchObject({
predicate: ['delete'],
object: ['foo', '1234'],
});
});
});
});
});
});

View file

@ -5,8 +5,9 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Logger } from '@kbn/core/server';
import { Logger } from '@kbn/core/server';
import { EventStreamService } from '../event_stream';
import { ContentCrud } from './crud';
import { EventBus } from './event_bus';
import { ContentRegistry } from './registry';
@ -25,6 +26,11 @@ export interface CoreApi {
eventBus: EventBus;
}
export interface CoreInitializerContext {
logger: Logger;
eventStream: EventStreamService;
}
export interface CoreSetup {
/** Content registry instance */
contentRegistry: ContentRegistry;
@ -36,7 +42,7 @@ export class Core {
private contentRegistry: ContentRegistry;
private eventBus: EventBus;
constructor({ logger }: { logger: Logger }) {
constructor(private readonly ctx: CoreInitializerContext) {
const contentTypeValidator = (contentType: string) =>
this.contentRegistry?.isContentRegistered(contentType) ?? false;
this.eventBus = new EventBus(contentTypeValidator);
@ -44,6 +50,8 @@ export class Core {
}
setup(): CoreSetup {
this.setupEventStream();
return {
contentRegistry: this.contentRegistry,
api: {
@ -53,4 +61,16 @@ export class Core {
},
};
}
private setupEventStream() {
// TODO: This should be cleaned up and support added for all CRUD events.
this.eventBus.on('deleteItemSuccess', (event) => {
this.ctx.eventStream.addEvent({
// TODO: add "subject" field to event
predicate: ['delete'],
// TODO: the `.contentId` should be easily available on most events.
object: [event.contentTypeId, (event as any).contentId],
});
});
}
}

View file

@ -0,0 +1,50 @@
# Event Stream
## The service
On a high-level the Event Stream is exposed through the `EventStreamService`
class, which is the public interface to the Event Stream, it holds any necessary
state, and follows plugin life-cycle methods.
The service also validates the events before they are stored. It also buffers
the events on write. Events are buffered for 250ms or up to 100 events before
they are flushed to the storage.
## The client
On a lower level the actual event storage is defined in the `EventStreamClient`
interface. There are two `EventStreamClient` implementations:
- `EsEventStreamClient` is the production implementation, which stores events
to the Elasticsearch.
- `MemoryEventStreamClient` is used for testing and could be used for demo
purposes.
### The `EsEventStreamClient` client
`EsEventStreamClient` is used in production. It stores events in the
`.kibana-event-stream` data stream. The data stream and index template are
created during plugin initialization "start" life-cycle.
The mappings define `meta` and `indexed` fields, which are reserved for future
schema extensions (so that new fields can be added without mapping changes).
The mappings also define a transaction ID (`txID`) field, which can be used to
correlate multiple related events together or to store the transaction ID.
Events are written to Elasticsearch using the `_bulk` request API.
## Testing
The `MemoryEventStreamClient` can be used to simulate the Event Stream in Jest
unit test environment. Use `setupEventStreamService()` to spin up the service
in the test environment.
The clients themselves can be tested using the `testEventStreamClient` test
suite, which should help with verifying that both implements work correctly.
The `EsEventStreamClient` it is tested using Kibana integration tests, but for
`MemoryEventStreamClient` it is tested as a Jest tests.

View file

@ -0,0 +1,189 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { estypes } from '@elastic/elasticsearch';
import { KueryNode, nodeBuilder, toElasticsearchQuery } from '@kbn/es-query';
import type { EsClient, EsEventStreamEventDto } from './types';
import type {
EventStreamClient,
EventStreamClientFilterOptions,
EventStreamClientFilterResult,
EventStreamEvent,
EventStreamLogger,
} from '../types';
import { EsEventStreamNames } from './es_event_stream_names';
import { EsEventStreamInitializer } from './init/es_event_stream_initializer';
import { eventToDto, dtoToEvent } from './util';
export interface EsEventStreamClientDependencies {
baseName: string;
kibanaVersion: string;
logger: EventStreamLogger;
esClient: Promise<EsClient>;
}
const sort: estypes.Sort = [
{
// By default we always sort by event timestamp descending.
'@timestamp': {
order: 'desc',
},
// Tie breakers for events with the same timestamp.
subjectId: {
order: 'desc',
},
objectId: {
order: 'desc',
},
predicate: {
order: 'desc',
},
},
];
export class EsEventStreamClient implements EventStreamClient {
readonly #names: EsEventStreamNames;
constructor(private readonly deps: EsEventStreamClientDependencies) {
this.#names = new EsEventStreamNames(deps.baseName);
}
public async initialize(): Promise<void> {
const initializer = new EsEventStreamInitializer({
names: this.#names,
kibanaVersion: this.deps.kibanaVersion,
logger: this.deps.logger,
esClient: this.deps.esClient,
});
await initializer.initialize();
}
public async writeEvents(events: EventStreamEvent[]): Promise<void> {
if (events.length === 0) return;
const esClient = await this.deps.esClient;
const operations: Array<estypes.BulkOperationContainer | EsEventStreamEventDto> = [];
for (const event of events) {
const dto = eventToDto(event);
operations.push({ create: {} }, dto);
}
const { errors } = await esClient.bulk(
{
index: this.#names.dataStream,
operations,
},
{
maxRetries: 0,
}
);
if (errors) {
throw new Error('Some events failed to be indexed.');
}
}
public async tail(limit: number = 100): Promise<EventStreamEvent[]> {
return (await this.filter({ limit })).events;
}
public async filter(
options: EventStreamClientFilterOptions
): Promise<EventStreamClientFilterResult> {
const esClient = await this.deps.esClient;
const topLevelNodes: KueryNode[] = [];
if (options.subject && options.subject.length) {
topLevelNodes.push(
nodeBuilder.or(
options.subject.map(([type, id]) =>
!id
? nodeBuilder.is('subjectType', type)
: nodeBuilder.and([
nodeBuilder.is('subjectType', type),
nodeBuilder.is('subjectId', id),
])
)
)
);
}
if (options.object && options.object.length) {
topLevelNodes.push(
nodeBuilder.or(
options.object.map(([type, id]) =>
!id
? nodeBuilder.is('objectType', type)
: nodeBuilder.and([
nodeBuilder.is('objectType', type),
nodeBuilder.is('objectId', id),
])
)
)
);
}
if (options.predicate && options.predicate.length) {
topLevelNodes.push(
nodeBuilder.or(options.predicate.map((type) => nodeBuilder.is('predicate', type)))
);
}
if (options.transaction && options.transaction.length) {
topLevelNodes.push(
nodeBuilder.or(options.transaction.map((id) => nodeBuilder.is('txId', id)))
);
}
if (options.from) {
const from = new Date(options.from).toISOString();
const node = nodeBuilder.range('@timestamp', 'gte', from);
topLevelNodes.push(node);
}
if (options.to) {
const to = new Date(options.to).toISOString();
const node = nodeBuilder.range('@timestamp', 'lte', to);
topLevelNodes.push(node);
}
const query = toElasticsearchQuery(nodeBuilder.and(topLevelNodes));
const size = options.limit ?? 100;
const request: estypes.SearchRequest = {
index: this.#names.dataStream,
query,
sort,
size,
track_total_hits: false,
};
if (options.cursor) {
request.search_after = JSON.parse(options.cursor);
}
const res = await esClient.search<EsEventStreamEventDto>(request);
const events = res.hits.hits.map((hit) => dtoToEvent(hit._source!));
const lastHit = res.hits.hits[res.hits.hits.length - 1];
let cursor: string = '';
if (events.length >= size && lastHit && lastHit.sort) {
cursor = JSON.stringify(lastHit.sort);
}
return {
cursor,
events,
};
}
}

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { CoreSetup } from '@kbn/core/server';
import type { EventStreamClient, EventStreamClientFactory, EventStreamLogger } from '../types';
import { EsEventStreamClient } from './es_event_stream_client';
export interface EsEventStreamClientFactoryDependencies {
/**
* The prefix used for index names. Usually `.kibana`, as Elasticsearch
* treats indices starting with the `.kibana*` prefix as a special indices
* that only Kibana should be allowed to access.
*/
baseName: string;
kibanaVersion: string;
logger: EventStreamLogger;
}
export class EsEventStreamClientFactory implements EventStreamClientFactory {
constructor(private readonly deps: EsEventStreamClientFactoryDependencies) {}
public create(core: CoreSetup): EventStreamClient {
const startServices = core.getStartServices();
return new EsEventStreamClient({
...this.deps,
esClient: startServices.then(([{ elasticsearch }]) => elasticsearch.client.asInternalUser),
});
}
}

View file

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export class EsEventStreamNames {
public readonly base: string;
public readonly dataStream: string;
public readonly indexPattern: string;
public readonly indexTemplate: string;
constructor(baseName: string) {
const EVENT_STREAM_SUFFIX = `-event-stream`;
const dataStream = `${baseName}${EVENT_STREAM_SUFFIX}`;
this.base = baseName;
this.dataStream = dataStream;
this.indexPattern = `${dataStream}*`;
this.indexTemplate = `${dataStream}-template`;
}
}

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export {
EsEventStreamClient,
type EsEventStreamClientDependencies,
} from './es_event_stream_client';
export {
EsEventStreamClientFactory,
type EsEventStreamClientFactoryDependencies,
} from './es_event_stream_client_factory';

View file

@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import pRetry from 'p-retry';
import { errors } from '@elastic/elasticsearch';
import type { EsClient } from '../types';
import type { EsEventStreamNames } from '../es_event_stream_names';
import type { EventStreamLogger } from '../../types';
import { newIndexTemplateRequest } from './index_template';
export interface EsEventStreamInitializerDependencies {
names: EsEventStreamNames;
kibanaVersion: string;
logger: EventStreamLogger;
esClient: Promise<EsClient>;
}
export class EsEventStreamInitializer {
constructor(private readonly deps: EsEventStreamInitializerDependencies) {}
public async initialize(): Promise<void> {
const createdIndexTemplate = await this.#retry(
this.createIndexTemplateIfNotExists,
'createIndexTemplateIfNotExists'
);
if (createdIndexTemplate) {
await this.#retry(this.createDataStream, 'createDataStream');
}
}
/**
* Calls a function; retries calling it multiple times via p-retry, if it fails.
* Should retry on 2s, 4s, 8s, 16s.
*
* See: https://github.com/tim-kos/node-retry#retryoperationoptions
*
* @param fn Function to retry, if it fails.
*/
readonly #retry = async <R>(fn: () => Promise<R>, fnName: string): Promise<R> => {
this.deps.logger.debug(`Event Stream initialization operation: ${fnName}`);
return await pRetry(fn, {
minTimeout: 1000,
maxTimeout: 1000 * 60 * 3,
retries: 4,
factor: 2,
randomize: true,
onFailedAttempt: (err) => {
const message =
`Event Stream initialization operation failed and will be retried: ${fnName};` +
`${err.retriesLeft} more times; error: ${err.message}`;
this.deps.logger.warn(message);
},
});
};
protected readonly createIndexTemplateIfNotExists = async (): Promise<boolean> => {
const exists = await this.indexTemplateExists();
if (exists) return false;
return await this.createIndexTemplate();
};
protected async indexTemplateExists(): Promise<boolean> {
try {
const esClient = await this.deps.esClient;
const name = this.deps.names.indexTemplate;
const exists = await esClient.indices.existsIndexTemplate({ name });
return !!exists;
} catch (err) {
throw new Error(`error checking existence of index template: ${err.message}`);
}
}
protected async createIndexTemplate(): Promise<boolean> {
try {
const esClient = await this.deps.esClient;
const { indexTemplate, indexPattern } = this.deps.names;
const request = newIndexTemplateRequest({
name: indexTemplate,
indexPatterns: [indexPattern],
kibanaVersion: this.deps.kibanaVersion,
});
await esClient.indices.putIndexTemplate(request);
return true;
} catch (err) {
// The error message doesn't have a type attribute we can look to guarantee it's due
// to the template already existing (only long message) so we'll check ourselves to see
// if the template now exists. This scenario would happen if you startup multiple Kibana
// instances at the same time.
const exists = await this.indexTemplateExists();
if (exists) return false;
const error = new Error(`error creating index template: ${err.message}`);
Object.assign(error, { wrapped: err });
throw error;
}
}
protected readonly createDataStream = async (): Promise<void> => {
const esClient = await this.deps.esClient;
const name = this.deps.names.dataStream;
try {
await esClient.indices.createDataStream({
name,
});
} catch (error) {
const alreadyExists =
(error as errors.ResponseError)?.body?.error?.type === 'resource_already_exists_exception';
if (alreadyExists) return;
throw error;
}
};
}

View file

@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { estypes } from '@elastic/elasticsearch';
import { mappings } from './mappings';
export interface NewIndexTemplateRequestParams {
name: string;
indexPatterns: string[];
kibanaVersion: string;
}
export const newIndexTemplateRequest = (
params: NewIndexTemplateRequestParams
): estypes.IndicesPutIndexTemplateRequest => {
const version = 1;
const { name, indexPatterns, kibanaVersion } = params;
return {
name,
// This will create the template only if it doesn't exist.
create: true,
// This object is required to make it a data stream template.
data_stream: {
hidden: true,
},
// Our own metadata to keep track of the template.
_meta: {
description: 'This data stream stores events for the Kibana content_management plugin.',
// Template version.
version,
// Kibana version when the template was created.
kibanaVersion,
},
// Setting this to something higher than the default 0 will allow
// to define lower priority templates in the future.
priority: 50,
version,
index_patterns: indexPatterns,
template: {
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
'index.hidden': true,
},
mappings,
},
};
};

View file

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { estypes } from '@elastic/elasticsearch';
export const mappings: estypes.MappingTypeMapping = {
dynamic: false,
properties: {
/**
* Every document indexed to a data stream must contain a `@timestamp`
* field, mapped as a `date` or `date_nanos` field type.
*/
'@timestamp': {
type: 'date',
},
/** Subject is the content item who/which performed the event. */
subjectType: {
type: 'keyword',
ignore_above: 256,
},
subjectId: {
type: 'keyword',
ignore_above: 256,
},
/** Object is the content item on which the event was performed. */
objectType: {
type: 'keyword',
ignore_above: 256,
},
objectId: {
type: 'keyword',
ignore_above: 256,
},
/** The event type. */
predicate: {
type: 'keyword',
ignore_above: 256,
},
/** Custom payload, may be be different per event type. */
payload: {
type: 'object',
enabled: false,
dynamic: false,
},
/**
* Transaction ID which allows to trace the event back to the original
* request or to correlate multiple events. For example, one user action
* can result in multiple events, all of which will have the same `txId`.
*/
txId: {
type: 'keyword',
ignore_above: 256,
},
/**
* Reserved for future extensions. Event Stream client can add custom
* private fields here in the future if needed, without having to update
* the index template mappings.
*/
meta: {
type: 'object',
enabled: false,
dynamic: false,
},
/**
* Reserved for the future extensions, same as the `meta` field, but fields
* added to this object will be indexed.
*
* See dynamic field mapping rules: https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html
*/
indexed: {
type: 'object',
enabled: true,
dynamic: true,
},
},
};

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { ElasticsearchClient } from '@kbn/core/server';
import {
createTestServers,
type TestElasticsearchUtils,
type TestKibanaUtils,
} from '@kbn/core-test-helpers-kbn-server';
import { EsEventStreamClient } from '../es_event_stream_client';
import { EsEventStreamNames } from '../es_event_stream_names';
import { EventStreamLoggerMock } from '../../tests/event_stream_logger_mock';
import { testEventStreamClient } from '../../tests/test_event_stream_client';
describe('EsEventStreamClient', () => {
let manageES: TestElasticsearchUtils;
let manageKbn: TestKibanaUtils;
let esClient: ElasticsearchClient;
let resolveClient: (client: EsEventStreamClient) => void = () => {};
const client: Promise<EsEventStreamClient> = new Promise((resolve) => {
resolveClient = resolve;
});
const baseName = '.kibana-test';
const names = new EsEventStreamNames(baseName);
beforeAll(async () => {
const { startES, startKibana } = createTestServers({ adjustTimeout: jest.setTimeout });
manageES = await startES();
manageKbn = await startKibana();
esClient = manageKbn.coreStart.elasticsearch.client.asInternalUser;
resolveClient(
new EsEventStreamClient({
baseName,
esClient: Promise.resolve(esClient),
kibanaVersion: '1.2.3',
logger: new EventStreamLoggerMock(),
})
);
});
afterAll(async () => {
await manageKbn.root.shutdown();
await manageKbn.stop();
await manageES.stop();
});
it('can initialize the Event Stream', async () => {
const exists1 = await esClient.indices.existsIndexTemplate({
name: names.indexTemplate,
});
expect(exists1).toBe(false);
await (await client).initialize();
const exists2 = await esClient.indices.existsIndexTemplate({
name: names.indexTemplate,
});
expect(exists2).toBe(true);
});
it('should return "resource_already_exists_exception" error when data stream already exists', async () => {
try {
await esClient.indices.createDataStream({
name: names.dataStream,
});
throw new Error('Not expected');
} catch (error) {
expect(error.body.error.type).toBe('resource_already_exists_exception');
}
});
testEventStreamClient(client);
});

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Client } from '@elastic/elasticsearch';
export type EsClient = Omit<
Client,
'connectionPool' | 'serializer' | 'extend' | 'close' | 'diagnostic'
>;
/**
* Represents a single event as it is stored in Elasticsearch.
*/
export interface EsEventStreamEventDto {
/**
* Time when the event occurred.
*/
'@timestamp': string;
/**
* Type of the subject. Subject is the content item who/which performed the
* event.
*/
subjectType?: string;
/**
* ID of the subject.
*/
subjectId?: string;
/**
* Type of the object. Object is the content item on which the event was
* performed.
*/
objectType?: string;
/**
* ID of the object.
*/
objectId?: string;
/**
* Specifies the event type. Such as `create`, `update`, `delete`, etc.
*/
predicate: string;
/**
* Custom payload, maybe be different per event type. Provided by the
* event type originator.
*/
payload?: Record<string, unknown>;
/**
* Transaction ID which allows to trace the event back to the original
* request or to correlate multiple events. For example, one user action
* can result in multiple events, all of which will have the same `txId`.
*/
txId?: string;
/**
* Reserved for future extensions. Custom metadata may be added here by the
* Event Stream implementation.
*/
meta?: Record<string, unknown>;
/**
* Reserved for future extensions. Same as `meta`, but indexed.
*/
indexed?: Record<string, unknown>;
}

View file

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Mutable } from 'utility-types';
import type { EventStreamEvent } from '../types';
import type { EsEventStreamEventDto } from './types';
export const eventToDto = (event: EventStreamEvent): EsEventStreamEventDto => {
const { time, subject, predicate, object, transaction } = event;
const dto: EsEventStreamEventDto = {
'@timestamp': new Date(time).toISOString(),
predicate: predicate[0],
};
if (subject) {
dto.subjectType = subject[0];
dto.subjectId = subject[1];
}
if (predicate[1]) {
dto.payload = predicate[1];
}
if (object) {
dto.objectType = object[0];
dto.objectId = object[1];
}
if (transaction) {
dto.txId = transaction;
}
return dto;
};
export const dtoToEvent = (dto: EsEventStreamEventDto): EventStreamEvent => {
const {
'@timestamp': timestamp,
subjectType,
subjectId,
predicate,
payload,
objectId,
objectType,
txId,
} = dto;
const event: Mutable<EventStreamEvent> = {
time: new Date(timestamp).getTime(),
predicate: payload ? [predicate, payload] : [predicate],
};
if (subjectType && subjectId) {
event.subject = [subjectType, subjectId];
}
if (objectType && objectId) {
event.object = [objectType, objectId];
}
if (txId) {
event.transaction = txId;
}
return event;
};

View file

@ -0,0 +1,333 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { setupEventStreamService } from './tests/setup_event_stream_service';
const setup = setupEventStreamService;
describe('EventStreamService', () => {
describe('.tail()', () => {
test('returns no events by default', async () => {
const { service } = setup();
service.flush();
const events = await service.tail();
expect(events).toStrictEqual([]);
});
});
describe('validation', () => {
describe('event time', () => {
test('cannot be too far in the future', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
time: 4000000000000,
predicate: ['test'],
},
])
).toThrow();
});
test('cannot be too far in the past', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
time: 1,
predicate: ['test'],
},
])
).toThrow();
});
test('cannot be a float', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
time: Date.now() + 0.5,
predicate: ['test'],
},
])
).toThrow();
});
test('cannot be a string', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
time: String(Date.now()) as any,
predicate: ['test'],
},
])
).toThrow();
});
});
describe('event subject', () => {
test('type cannot be empty', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
subject: ['', '123'],
predicate: ['test'],
},
])
).toThrow();
});
test('type cannot be too long', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
subject: [
'0123456789012345678901234567890123456789012345678901234567890123456789',
'123',
],
predicate: ['test'],
},
])
).toThrow();
});
test('ID cannot be too long', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
subject: [
'dashboard',
'012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789',
],
predicate: ['test'],
},
])
).toThrow();
});
test('cannot be null', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
subject: null as any,
predicate: ['test'],
},
])
).toThrow();
});
});
describe('event predicate', () => {
test('type cannot be missing', async () => {
const { service } = setup();
expect(() => service.addEvents([{} as any])).toThrow();
});
test('type cannot be null', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
predicate: null as any,
},
])
).toThrow();
});
test('type cannot be a number', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
predicate: [123 as any],
},
])
).toThrow();
});
test('type cannot be an empty string', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
predicate: [''],
},
])
).toThrow();
});
test('cannot be a long string', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
predicate: ['0123456789012345678901234567890123456789012345678901234567890123456789'],
},
])
).toThrow();
});
test('can be a short string', async () => {
const { service } = setup();
service.addEvents([
{
predicate: ['view'],
},
]);
});
test('can have attributes', async () => {
const { service } = setup();
service.addEvents([
{
predicate: [
'view',
{
foo: 'bar',
},
],
},
]);
});
test('attributes cannot be empty', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
predicate: ['view', {}],
},
])
).toThrow();
});
test('attributes cannot be null', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
predicate: ['view', null as any],
},
])
).toThrow();
});
});
describe('event object', () => {
test('type cannot be empty', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
object: ['', '123'],
predicate: ['test'],
},
])
).toThrow();
});
test('type cannot be too long', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
object: [
'0123456789012345678901234567890123456789012345678901234567890123456789',
'123',
],
predicate: ['test'],
},
])
).toThrow();
});
test('ID cannot be too long', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
object: [
'dashboard',
'012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789',
],
predicate: ['test'],
},
])
).toThrow();
});
test('cannot be null', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
object: null as any,
predicate: ['test'],
},
])
).toThrow();
});
});
describe('event transaction', () => {
test('can be missing', async () => {
const { service } = setup();
service.addEvents([
{
predicate: ['test'],
},
]);
});
test('cannot be empty', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
predicate: ['test'],
transaction: '',
},
])
).toThrow();
});
test('cannot be too long', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
predicate: ['test'],
transaction:
'012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789',
},
])
).toThrow();
});
test('cannot be an integer', async () => {
const { service } = setup();
expect(() =>
service.addEvents([
{
predicate: ['test'],
transaction: 123 as any,
},
])
).toThrow();
});
});
});
});

View file

@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { CoreSetup } from '@kbn/core/server';
import { TimedItemBuffer } from '@kbn/bfetch-plugin/common';
import type {
EventStreamClient,
EventStreamClientFactory,
EventStreamClientFilterOptions,
EventStreamClientFilterResult,
EventStreamEvent,
EventStreamEventPartial,
EventStreamLogger,
} from './types';
import { partialEventValidator } from './validation';
export interface EventStreamInitializerContext {
logger: EventStreamLogger;
clientFactory: EventStreamClientFactory;
}
export interface EventStreamSetup {
core: CoreSetup;
}
export class EventStreamService {
protected client?: EventStreamClient;
readonly #buffer: TimedItemBuffer<EventStreamEvent>;
constructor(private readonly ctx: EventStreamInitializerContext) {
this.#buffer = new TimedItemBuffer<EventStreamEvent>({
flushOnMaxItems: 100,
maxItemAge: 250,
onFlush: async (events: EventStreamEvent[]) => {
const { logger } = this.ctx;
if (!this.client) {
logger.error('EventStreamClient is not initialized, events will not be written.');
return;
}
try {
await this.client.writeEvents(events);
} catch (error) {
logger.error('Failed to write events to Event Stream.');
logger.error(error);
}
},
});
}
/** Called during "setup" plugin life-cycle. */
public setup({ core }: EventStreamSetup): void {
this.client = this.ctx.clientFactory.create(core);
}
/** Called during "start" plugin life-cycle. */
public start(): void {
const { logger } = this.ctx;
if (!this.client) throw new Error('EventStreamClient not initialized.');
logger.debug('Initializing Event Stream.');
this.client
.initialize()
.then(() => {
logger.debug('Event Stream was initialized.');
})
.catch((error) => {
logger.error('Failed to initialize Event Stream. Events will not be indexed.');
logger.error(error);
});
}
/** Called during "stop" plugin life-cycle. */
public async stop(): Promise<void> {
await this.#buffer.flushAsync();
}
#getClient(): EventStreamClient {
if (!this.client) throw new Error('EventStreamClient not initialized.');
return this.client;
}
/**
* Validates a single event. Throws an error if the event is invalid.
*
* @param event A partial event to validate.
*/
protected validatePartialEvent(event: EventStreamEventPartial): void {
partialEventValidator(event);
if (partialEventValidator.errors) {
const error = partialEventValidator.errors[0];
if (!error) throw new Error('Validation failed.');
throw new Error(`Validation error at [path = ${error.instancePath}]: ${error.message}`);
}
}
/**
* Queues an event to be written to the Event Stream. The event is appended to
* a buffer and written to the Event Stream periodically.
*
* Events are flushed once the buffer reaches 100 items or 250ms has passed,
* whichever comes first. To force a flush, call `.flush()`.
*
* @param event Event to add to the Event Stream.
*/
public addEvent(event: EventStreamEventPartial): void {
this.validatePartialEvent(event);
const completeEvent: EventStreamEvent = {
...event,
time: event.time || Date.now(),
};
this.#buffer.write(completeEvent);
}
/**
* Same as `.addEvent()` but accepts an array of events.
*
* @param events Events to add to the Event Stream.
*/
public addEvents(events: EventStreamEventPartial[]): void {
for (const event of events) {
this.addEvent(event);
}
}
/**
* Flushes the event buffer, writing all events to the Event Stream.
*/
public flush(): void {
this.#buffer.flush();
}
/**
* Read latest events from the Event Stream.
*
* @param limit Number of events to return. Defaults to 100.
* @returns Latest events from the Event Stream.
*/
public async tail(limit: number = 100): Promise<EventStreamEvent[]> {
const client = this.#getClient();
return await client.tail(limit);
}
/**
* Retrieves events from the Event Stream which match the specified filter
* options.
*
* @param options Filtering options.
* @returns Paginated results of events matching the filter.
*/
public async filter(
options: EventStreamClientFilterOptions
): Promise<EventStreamClientFilterResult> {
const client = this.#getClient();
return await client.filter(options);
}
}

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export * from './types';
export * from './es';
export { EventStreamService } from './event_stream_service';

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export { MemoryEventStreamClient } from './memory_event_stream_client';
export { MemoryEventStreamClientFactory } from './memory_event_stream_client_factory';

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { MemoryEventStreamClient } from './memory_event_stream_client';
import { testEventStreamClient } from '../tests/test_event_stream_client';
const client = new MemoryEventStreamClient();
describe('MemoryEventStreamClient', () => {
testEventStreamClient(Promise.resolve(client));
});

View file

@ -0,0 +1,123 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type {
EventStreamClient,
EventStreamClientFilterOptions,
EventStreamClientFilterResult,
EventStreamEvent,
} from '../types';
import { clone } from './util';
/**
* This is an in-memory implementation of the {@link EventStreamClient}
* interface (it does not persist events to Elasticsearch). It is useful for
* testing and demo purposes.
*/
export class MemoryEventStreamClient implements EventStreamClient {
#events: EventStreamEvent[] = [];
public async initialize(): Promise<void> {}
public async writeEvents(events: EventStreamEvent[]): Promise<void> {
for (const event of events) {
this.#events.push(clone(event));
}
this.#events.sort((a, b) => b.time - a.time);
}
public async tail(limit: number = 100): Promise<EventStreamEvent[]> {
const tail = this.#events.slice(0, limit);
return tail.map(clone);
}
public async filter(
options: EventStreamClientFilterOptions
): Promise<EventStreamClientFilterResult> {
let events: EventStreamEvent[] = [...this.#events];
const { subject, object, predicate, transaction, from, to } = options;
if (subject && subject.length) {
events = events.filter((event) => {
if (!event.subject) {
return false;
}
return subject.some(([type, id]) => {
if (!id) return type === event.subject![0];
return type === event.subject![0] && id === event.subject![1];
});
});
}
if (object && object.length) {
events = events.filter((event) => {
if (!event.object) {
return false;
}
return object.some(([type, id]) => {
if (!id) return type === event.object![0];
return type === event.object![0] && id === event.object![1];
});
});
}
if (predicate && predicate.length) {
events = events.filter((event) => {
if (!event.predicate) {
return false;
}
return predicate.some((type) => {
if (type && type !== event.predicate![0]) {
return false;
}
return true;
});
});
}
if (transaction && transaction.length) {
events = events.filter((event) => {
return !event.transaction ? false : transaction.some((id) => event.transaction === id);
});
}
if (from) {
events = events.filter((event) => event.time >= from);
}
if (to) {
events = events.filter((event) => event.time <= to);
}
const size = options.limit ?? 100;
const offset = options.cursor ? JSON.parse(options.cursor) : 0;
events = events.slice(offset);
if (events.length > size) {
events = events.slice(0, size);
}
let cursor: string = '';
if (events.length >= size) {
cursor = JSON.stringify(offset + size);
}
return {
cursor,
events: events.map(clone),
};
}
}

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { EventStreamClient, EventStreamClientFactory } from '../types';
import { MemoryEventStreamClient } from './memory_event_stream_client';
export class MemoryEventStreamClientFactory implements EventStreamClientFactory {
public create(): EventStreamClient {
return new MemoryEventStreamClient();
}
}

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export const clone = (x: unknown) => JSON.parse(JSON.stringify(x));

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { EventStreamLogger } from '../types';
export class EventStreamLoggerMock implements EventStreamLogger {
public debug = jest.fn();
public info = jest.fn();
public warn = jest.fn();
public error = jest.fn();
}

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { type CoreSetup } from '@kbn/core/server';
import { coreMock } from '@kbn/core/server/mocks';
import { EventStreamService } from '../event_stream_service';
import { EventStreamLoggerMock } from './event_stream_logger_mock';
import { MemoryEventStreamClientFactory } from '../memory';
export const setupEventStreamService = (kibanaCoreSetup: CoreSetup = coreMock.createSetup()) => {
const logger = new EventStreamLoggerMock();
const clientFactory = new MemoryEventStreamClientFactory();
const service = new EventStreamService({
logger,
clientFactory,
});
service.setup({ core: kibanaCoreSetup });
service.start();
return {
logger,
clientFactory,
service,
};
};

View file

@ -0,0 +1,410 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { until } from './util';
import { EventStreamClient, EventStreamEvent } from '../types';
export const testEventStreamClient = (clientPromise: Promise<EventStreamClient>) => {
let now = Date.now();
const getTime = () => now++;
const items: EventStreamEvent[] = [
{
predicate: ['test', { foo: 'bar' }],
time: getTime(),
},
{
time: getTime(),
subject: ['user', '1'],
predicate: ['create', { foo: 'bar' }],
object: ['dashboard', '1'],
},
{
time: getTime(),
subject: ['user', '2'],
predicate: ['view'],
object: ['map', 'xyz'],
},
{
time: getTime(),
subject: ['user', '2'],
predicate: ['view'],
object: ['canvas', 'xxxx-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'],
},
{
time: getTime(),
subject: ['user', '55'],
predicate: [
'share',
{
foo: 'bar',
baz: 'qux',
nested: {
value: 123,
},
},
],
object: ['dashboard', '1'],
},
{
time: getTime(),
subject: ['user', '1'],
predicate: ['view'],
object: ['map', 'xyz'],
},
{
time: getTime(),
subject: ['user', '2'],
predicate: ['view'],
object: ['canvas', 'yyyy-yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy'],
},
];
describe('.writeEvents()', () => {
it('can write a single event', async () => {
const client = await clientPromise;
await client.writeEvents([items[0]]);
await until(async () => {
const events = await client.tail();
return events.length === 1;
}, 100);
const tail = await client.tail();
expect(tail).toMatchObject([items[0]]);
});
it('can write multiple events', async () => {
const client = await clientPromise;
const events: EventStreamEvent[] = [items[1], items[2], items[3]];
await client.writeEvents(events);
await until(async () => {
const tail = await client.tail();
return tail.length === 4;
}, 100);
const tail = await client.tail();
expect(tail.slice(0, 3)).toMatchObject([events[2], events[1], events[0]]);
});
});
describe('.tail()', () => {
it('can limit events to last 2', async () => {
const client = await clientPromise;
const events: EventStreamEvent[] = [items[4], items[5], items[6]];
await client.writeEvents(events);
await until(async () => {
const tail = await client.tail();
return tail.length === 7;
}, 100);
const tail = await client.tail(2);
expect(tail.length).toBe(2);
expect(tail).toMatchObject([events[2], events[1]]);
});
});
describe('.filter()', () => {
it('can fetch all events, cursor is empty', async () => {
const result = await (await clientPromise).filter({});
// console.log(JSON.stringify(result, null, 2));
expect(result.cursor).toBe('');
expect(result.events.length).toBe(7);
expect(result.events).toMatchObject(items.slice(0, 7).sort((a, b) => b.time - a.time));
});
it('can paginate through results', async () => {
const client = await clientPromise;
const result1 = await client.filter({ limit: 3, cursor: '' });
const result2 = await client.filter({ limit: 3, cursor: result1.cursor });
const result3 = await client.filter({ limit: 3, cursor: result2.cursor });
expect(!!result1.cursor).toBe(true);
expect(!!result2.cursor).toBe(true);
expect(!!result3.cursor).toBe(false);
expect(result1.events.length).toBe(3);
expect(result2.events.length).toBe(3);
expect(result3.events.length).toBe(1);
});
it('can select all results but one, and then the last result', async () => {
const client = await clientPromise;
const result1 = await client.filter({ limit: 6, cursor: '' });
const result2 = await client.filter({ limit: 106, cursor: result1.cursor });
expect(!!result1.cursor).toBe(true);
expect(!!result2.cursor).toBe(false);
expect(result1.events.length).toBe(6);
expect(result2.events.length).toBe(1);
expect(result2.events).toMatchObject([items[0]]);
});
it('can limit starting time range of results', async () => {
const client = await clientPromise;
const result = await client.filter({
from: items[2].time,
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(5);
expect(result.events).toMatchObject(items.slice(2, 7).sort((a, b) => b.time - a.time));
});
it('can limit ending time range of results', async () => {
const client = await clientPromise;
const result = await client.filter({
to: items[2].time,
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(3);
expect(result.events).toMatchObject(items.slice(0, 3).sort((a, b) => b.time - a.time));
});
it('can limit starting and ending time ranges of results', async () => {
const client = await clientPromise;
const result = await client.filter({
from: items[3].time,
to: items[5].time,
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(3);
expect(result.events).toMatchObject(items.slice(3, 6).sort((a, b) => b.time - a.time));
});
it('can filter results for a single subject', async () => {
const client = await clientPromise;
const result = await client.filter({
subject: [['user', '55']],
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(1);
expect(result.events[0]).toStrictEqual({
time: expect.any(Number),
subject: ['user', '55'],
predicate: [
'share',
{
foo: 'bar',
baz: 'qux',
nested: {
value: 123,
},
},
],
object: ['dashboard', '1'],
});
});
it('can filter results for multiple subjects', async () => {
const client = await clientPromise;
const result = await client.filter({
subject: [
['user', '55'],
['user', '1'],
],
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(3);
expect(result.events).toMatchObject([items[5], items[4], items[1]]);
});
it('can filter results for a single object', async () => {
const client = await clientPromise;
const event = items[6];
const result = await client.filter({
object: [event.object!],
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(1);
expect(result.events[0]).toStrictEqual(event);
});
it('can filter results for a two objects', async () => {
const client = await clientPromise;
const result = await client.filter({
object: [
['dashboard', '1'],
['map', 'xyz'],
],
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(4);
});
it('can filter results by predicate type', async () => {
const client = await clientPromise;
const result = await client.filter({
predicate: ['view'],
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(4);
expect(result.events).toMatchObject([items[6], items[5], items[3], items[2]]);
});
it('can filter results by multiple predicates', async () => {
const client = await clientPromise;
const result = await client.filter({
predicate: ['create', 'share'],
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(2);
expect(result.events).toMatchObject([items[4], items[1]]);
});
it('can combine multiple filters using AND clause', async () => {
const client = await clientPromise;
const result = await client.filter({
subject: [['user', '1']],
object: [['dashboard', '1']],
from: items[0].time,
to: items[6].time,
});
expect(result.cursor).toBe('');
expect(result.events.length).toBe(1);
expect(result.events[0]).toMatchObject(items[1]);
});
});
describe('transactions', () => {
it('can associate multiple events with one transaction', async () => {
const client = await clientPromise;
const time = getTime();
const transaction = 'my-transaction-id';
const events: EventStreamEvent[] = [
{
transaction,
time,
subject: ['user', '123'],
predicate: [
'tag',
{
use: ['foo', 'bar'],
disuse: ['baz'],
},
],
object: ['visualization', '666'],
},
{
transaction,
time,
subject: ['visualization', '666'],
predicate: ['use'],
object: ['tag', 'foo'],
},
{
transaction,
time,
subject: ['visualization', '666'],
predicate: ['use'],
object: ['tag', 'bar'],
},
{
transaction,
time,
subject: ['visualization', '666'],
predicate: ['disuse'],
object: ['tag', 'baz'],
},
];
await client.writeEvents(events);
const getEvents = async () => {
return await client.filter({
transaction: [transaction],
});
};
await until(async () => {
const result = await getEvents();
return result.events.length === 4;
}, 100);
const result = await getEvents();
expect(result.events.length).toBe(4);
expect(
result.events.some(
(event) => event.object![0] === 'visualization' && event.object![1] === '666'
)
).toBe(true);
expect(
result.events.some((event) => event.object![0] === 'tag' && event.object![1] === 'foo')
).toBe(true);
expect(
result.events.some((event) => event.object![0] === 'tag' && event.object![1] === 'bar')
).toBe(true);
expect(
result.events.some((event) => event.object![0] === 'tag' && event.object![1] === 'baz')
).toBe(true);
});
it('can filter out two transactions', async () => {
const client = await clientPromise;
const events: EventStreamEvent[] = [
{
transaction: 'tx-1',
time: getTime(),
subject: ['user', '123'],
predicate: ['do-something'],
object: ['map', '101'],
},
{
transaction: 'tx-1',
time: getTime(),
subject: ['user', '123'],
predicate: ['do-something'],
object: ['canvas', '202'],
},
{
transaction: 'tx-2',
time: getTime(),
subject: ['api-key', 'abc'],
predicate: ['do-something-else'],
object: ['map', '101'],
},
];
await client.writeEvents(events);
const getEvents = async () => {
return await client.filter({
transaction: ['tx-1', 'tx-2'],
});
};
await until(async () => {
const result = await getEvents();
return result.events.length === 3;
}, 100);
const result = await getEvents();
expect(result.events.length).toBe(3);
});
});
};

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export const tick = (ms: number = 1) => new Promise((r) => setTimeout(r, ms));
export const until = async (check: () => boolean | Promise<boolean>, pollInterval: number = 1) => {
do {
if (await check()) return;
await tick(pollInterval);
} while (true);
};

View file

@ -0,0 +1,181 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { CoreSetup } from '@kbn/core/server';
import type { Optional } from 'utility-types';
/**
* Represents a factory which can be used to create an Event Stream client.
*/
export interface EventStreamClientFactory {
/**
* Creates an Event Stream client.
*
* @param core The CoreSetup object provided by the Kibana platform.
*/
create(core: CoreSetup): EventStreamClient;
}
/**
* Represents a storage layer for events.
*/
export interface EventStreamClient {
/**
* Initializes the Event Stream client. This method is run at the plugin's
* `setup` phase. It should be used to create any necessary resources.
*/
initialize(): Promise<void>;
/**
* Immediately writes one or more events to the Event Stream using a bulk
* request.
*
* @param events One or more events to write to the Event Stream.
*/
writeEvents: (events: EventStreamEvent[]) => Promise<void>;
/**
* Retrieves the most recent events from the Event Stream.
*
* @param limit The maximum number of events to return. If not specified, the
* default is 100.
*/
tail(limit?: number): Promise<EventStreamEvent[]>;
/**
* Retrieves events from the Event Stream which match the specified filter
* options.
*/
filter: (options: EventStreamClientFilterOptions) => Promise<EventStreamClientFilterResult>;
}
/**
* Represents the options which can be used to filter events from the Event
* Stream. Top level properties are joined by `AND` logic. For example, if
* `subject` and `predicate` are specified, only events which match both
* criteria will be returned.
*/
export interface EventStreamClientFilterOptions {
/**
* One or more subjects to filter by. Subjects are joined by `OR` logic.
*/
readonly subject?: Array<readonly [type: string, id?: string]>;
/**
* One or more predicates to filter by. Predicates are joined by `OR` logic.
*/
readonly predicate?: string[];
/**
* One or more objects to filter by. Objects are joined by `OR` logic.
*/
readonly object?: Array<readonly [type: string, id?: string]>;
/**
* One or more transaction IDs to filter by. Transactions are joined by `OR`
* logic.
*/
readonly transaction?: string[];
/**
* The starting time to filter by. Events which occurred after this time will
* be returned. If not specified, the default is the beginning of time.
*/
readonly from?: number;
/**
* The ending time to filter by. Events which occurred before this time will
* be returned. If not specified, the default is the current time.
*/
readonly to?: number;
/**
* The maximum number of events to return. If not specified, the default is
* 100.
*/
readonly limit?: number;
/**
* A cursor which can be used to retrieve the next page of results. On the
* first call, this should be `undefined` or empty string. On subsequent
* calls, this should be the value of the `cursor` property returned by the
* previous call in the {@link EventStreamClientFilterResult} object.
*/
readonly cursor?: string;
}
/**
* Represents the result of a `.filter()` operation.
*/
export interface EventStreamClientFilterResult {
/**
* A cursor which can be used to retrieve the next page of results. Should be
* treated as a opaque value. When empty, there are no more results.
*/
readonly cursor: string;
/**
* The list of events which matched the filter. Sorted by time in descending
* order.
*/
readonly events: EventStreamEvent[];
}
/**
* Represents a single event in the Event Stream. Events can be thought of as
* "Semantic triples" (see https://en.wikipedia.org/wiki/Semantic_triple).
* Semantic triples have a subject, a predicate, and an object. In the context
* of the Event Stream, the subject is the content item who/which performed the
* event, the predicate is the event type (such as `create`, `update`, `delete`,
* etc.), and the object is the content item on which the action was performed.
*/
export interface EventStreamEvent {
/**
* Specifies who performed the event. The subject is a tuple of the type of
* the subject and the ID of the subject.
*/
readonly subject?: readonly [type: string, id: string];
/**
* Specifies the event type. Such as `create`, `update`, `delete`, etc.
* The predicate is a tuple of the type of the predicate and any attributes
* associated with the predicate.
*/
readonly predicate: readonly [type: string, attributes?: Record<string, unknown>];
/**
* Specifies the content item on which the event was performed. The object is
* a tuple of the type of the object and the ID of the object.
*/
readonly object?: readonly [type: string, id: string];
/**
* Timestamp in milliseconds since the Unix Epoch when the event occurred.
*/
readonly time: number;
/**
* Transaction ID, which allows to trace the event back to the original
* request. As well as to associate multiple events together.
*/
readonly transaction?: string;
}
/**
* Represents a partial version of an EventStreamEvent, as it can be provided
* for ingestion. The `time` property is optional, as it will be set by the
* Event Stream if not provided.
*/
export type EventStreamEventPartial = Optional<EventStreamEvent, 'time'>;
import type { Logger } from '@kbn/core/server';
/**
* Logger interface used in the Event Stream.
*/
export type EventStreamLogger = Pick<Logger, 'debug' | 'error' | 'info' | 'warn'>;

View file

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import Ajv from 'ajv/dist/2020';
const ajv = new Ajv({
allErrors: false,
strict: false,
verbose: false,
});
const partialEventSchema = {
type: 'object',
required: ['predicate'],
additionalProperties: false,
properties: {
subject: {
type: 'array',
nullable: false,
minItems: 2,
maxItems: 2,
items: false,
prefixItems: [
{
type: 'string',
minLength: 1,
maxLength: 64,
},
{
type: 'string',
maxLength: 255,
},
],
},
predicate: {
type: 'array',
minItems: 1,
maxItems: 2,
items: false,
nullable: false,
prefixItems: [
{
type: 'string',
minLength: 1,
maxLength: 64,
},
{
type: 'object',
additionalProperties: true,
minProperties: 1,
maxProperties: 255,
},
],
},
object: {
type: 'array',
nullable: false,
minItems: 2,
maxItems: 2,
items: false,
prefixItems: [
{
type: 'string',
minLength: 1,
maxLength: 64,
},
{
type: 'string',
maxLength: 255,
},
],
},
time: {
type: 'number',
nullable: false,
multipleOf: 1,
// Just some sane limits so the number doesn't escape too far into the
// future or past.
minimum: 1600000000000, // Sep 2020
maximum: 2600000000000, // May 2052
},
transaction: {
type: 'string',
nullable: false,
minLength: 1,
maxLength: 255,
},
},
};
export const partialEventValidator = ajv.compile(partialEventSchema);

View file

@ -6,7 +6,7 @@
* Side Public License, v 1.
*/
import { loggingSystemMock, coreMock } from '@kbn/core/server/mocks';
import { coreMock } from '@kbn/core/server/mocks';
import { ContentManagementPlugin } from './plugin';
import { IRouter } from '@kbn/core/server';
import type { ProcedureName } from '../common';
@ -62,22 +62,29 @@ jest.mock('./rpc/procedures/all_procedures', () => {
});
const setup = () => {
const logger = loggingSystemMock.create();
const { http } = coreMock.createSetup();
const coreSetup = coreMock.createSetup();
const router: IRouter<any> = coreSetup.http.createRouter();
const http = { ...coreSetup.http, createRouter: () => router };
const plugin = new ContentManagementPlugin(coreMock.createPluginInitializerContext());
const router: IRouter<any> = http.createRouter();
router.post = jest.fn();
const plugin = new ContentManagementPlugin({ logger });
return { plugin, http: { createRouter: () => router }, router };
return {
plugin,
http,
router,
coreSetup: {
...coreSetup,
http,
},
};
};
describe('ContentManagementPlugin', () => {
describe('setup()', () => {
test('should expose the core API', () => {
const { plugin, http } = setup();
const api = plugin.setup({ http });
const { plugin, coreSetup } = setup();
const api = plugin.setup(coreSetup);
expect(Object.keys(api).sort()).toEqual(['crud', 'eventBus', 'register']);
expect(api.crud('')).toBe('mockedCrud');
@ -87,8 +94,8 @@ describe('ContentManagementPlugin', () => {
describe('RPC', () => {
test('should create a single POST HTTP route on the router', () => {
const { plugin, http, router } = setup();
plugin.setup({ http });
const { plugin, coreSetup, router } = setup();
plugin.setup(coreSetup);
expect(router.post).toBeCalledTimes(1);
const [routeConfig]: Parameters<IRouter['post']> = (router.post as jest.Mock).mock.calls[0];
@ -97,8 +104,8 @@ describe('ContentManagementPlugin', () => {
});
test('should register all the procedures in the RPC service and the route handler must send to each procedure the core request context + the request body as input', async () => {
const { plugin, http, router } = setup();
plugin.setup({ http });
const { plugin, coreSetup, router } = setup();
plugin.setup(coreSetup);
const [_, handler]: Parameters<IRouter['post']> = (router.post as jest.Mock).mock.calls[0];
@ -139,8 +146,8 @@ describe('ContentManagementPlugin', () => {
});
test('should return error in custom error format', async () => {
const { plugin, http, router } = setup();
plugin.setup({ http });
const { plugin, coreSetup, router } = setup();
plugin.setup(coreSetup);
const [_, handler]: Parameters<IRouter['post']> = (router.post as jest.Mock).mock.calls[0];

View file

@ -21,22 +21,37 @@ import {
ContentManagementServerStart,
SetupDependencies,
} from './types';
import { EventStreamService, EsEventStreamClientFactory } from './event_stream';
import { procedureNames } from '../common/rpc';
type CreateRouterFn = CoreSetup['http']['createRouter'];
export class ContentManagementPlugin
implements Plugin<ContentManagementServerSetup, ContentManagementServerStart, SetupDependencies>
{
private readonly logger: Logger;
private readonly core: Core;
readonly #eventStream: EventStreamService;
constructor(initializerContext: PluginInitializerContext) {
const kibanaVersion = initializerContext.env.packageInfo.version;
constructor(initializerContext: { logger: PluginInitializerContext['logger'] }) {
this.logger = initializerContext.logger.get();
this.core = new Core({ logger: this.logger });
this.#eventStream = new EventStreamService({
logger: this.logger,
clientFactory: new EsEventStreamClientFactory({
baseName: '.kibana',
kibanaVersion,
logger: this.logger,
}),
});
this.core = new Core({
logger: this.logger,
eventStream: this.#eventStream,
});
}
public setup(core: { http: { createRouter: CreateRouterFn } }) {
public setup(core: CoreSetup) {
this.#eventStream.setup({ core });
const { api: coreApi, contentRegistry } = this.core.setup();
const rpc = new RpcService<RpcContext>();
@ -54,6 +69,16 @@ export class ContentManagementPlugin
}
public start(core: CoreStart) {
this.#eventStream.start();
return {};
}
public async stop(): Promise<void> {
try {
await this.#eventStream.stop();
} catch (e) {
this.logger.error(`Error during event stream stop: ${e}`);
}
}
}

View file

@ -8,6 +8,9 @@
"@kbn/core",
"@kbn/config-schema",
"@kbn/core-http-request-handler-context-server",
"@kbn/es-query",
"@kbn/core-test-helpers-kbn-server",
"@kbn/bfetch-plugin",
"@kbn/object-versioning",
],
"exclude": [