[ResponseOps] Cases analytics index (#223405)

This PR is for a feature branch that is being merged into main.

The relevant PRs are:
- https://github.com/elastic/kibana/pull/219211
- https://github.com/elastic/kibana/pull/222820
- https://github.com/elastic/kibana/pull/223241
- https://github.com/elastic/kibana/pull/224388
- https://github.com/elastic/kibana/pull/224682

## Summary

This PR adds 4 new indexes with case analytics data, which are created
when the cases plugin starts.

  - `.internal.cases`
  - `.internal.cases-comments`
  - `.internal.cases-attachments`
  - `.internal.cases-activity`

After the indexes are created, a backfill task for each of them is
scheduled to run 1 minute after creation. This task populates the
indexes with relevant data from `.kibana_alerting_cases`.

A second type of task is registered, the index synchronization task.
Four of these tasks, one for each index, are scheduled to run every 5
minutes. The synchronization tasks populated the indexes with data from
`.kibana_alerting_cases` that was created or updated in the last five
minutes.

## How to test

You might want to start Kibana with `--verbose` to see relevant index
messages in the console.

Alternatively(what I normally do), is go to `analytics_index.ts`,
`backfill_task_runner.ts`, and `synchronization_task_runner.ts`, and
change the `logDebug` function to call `this.logger.info` instead. This
way, you will have less spam in the console.

Every log message starts with the index name between square brackets, so
you can look for `[.internal.cases-` and follow what is happening.

1. You should have some existing case data, so before anything else,
please create some activity, attachments, etc.
2. Add `xpack.cases.analytics.index.enabled: true` to `kibana.dev.yml`
and restart Kibana.
3. Check out [this
branch](https://github.com/elastic/elasticsearch/pull/129414) from the
ES project.
4. Start Elastic Search with `yarn es source`. This will use the above
version of Elasticsearch.
5. Wait a bit for the indexes to be created and populated(backfilled).
6. Using the dev tools:
    - Confirm the indexes exist.
- Check the index mapping. Does it match the one in the code? Is the
`_meta` field correct?
-
`x-pack/platform/plugins/shared/cases/server/cases_analytics/******_index/mappings.ts`
    - Check that the painless scripts match the code.
-
`x-pack/platform/plugins/shared/cases/server/cases_analytics/******_index/painless_scripts.ts`
- Confirm your existing case data is in the indexes. (See **Queries**
section below.)
7. Play around with cases. Some examples:
    - Create a case
    - Change status/severity
    - Attach alerts
    - Add files
    - Change category/tags
    - Add comments
    - etc
8. Go to the dev tools again and confirm all this shows up in the
relevant indexes. (See **Queries** section below.)

## Queries

In addition to the ones, below I have a few more. Things like reindexing
with specific scripts or fetching relevant data from
`.kibana_alerting_cases`. Ping me if you want those queries.

### Checking index content
```
GET /.internal.cases/_search
GET /.internal.cases-comments/_search
GET /.internal.cases-attachments/_search
GET /.internal.cases-activity/_search
```

### Checking index mappings
```
GET /.internal.cases
GET /.internal.cases-comments
GET /.internal.cases-attachments
GET /.internal.cases-activity
```

### Fetching the painless scripts
```
GET /_scripts/cai_cases_script_1
GET /_scripts/cai_attachments_script_1
GET /_scripts/cai_comments_script_1
GET /_scripts/cai_activity_script_1
```

### Emptying the indexes

It is sometimes useful for testing.
```
POST /.internal.cases/_delete_by_query
POST /.internal.cases-comments/_delete_by_query
POST /.internal.cases-attachments/_delete_by_query
POST /.internal.cases-activity/_delete_by_query
```

### Deleting the indexes

It is sometimes useful for testing.
```
DELETE /.internal.cases
DELETE /.internal.cases-comments
DELETE /.internal.cases-attachments
DELETE /.internal.cases-activity
```

## Release notes

Four dedicated case analytics indexes were created, allowing users to
build dashboards and metrics over case data. These indexes are created
on Kibana startup and updated periodically with cases, comments,
attachments, and activity data.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Co-authored-by: Christos Nasikas <christos.nasikas@elastic.co>
This commit is contained in:
Antonio 2025-06-24 08:46:32 +02:00 committed by GitHub
parent c989634a81
commit e566fec14b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
53 changed files with 3810 additions and 67 deletions

View file

@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { ALERTING_CASES_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
export const CAI_ACTIVITY_INDEX_NAME = '.internal.cases-activity';
export const CAI_ACTIVITY_INDEX_ALIAS = '.cases-activity';
export const CAI_ACTIVITY_INDEX_VERSION = 1;
export const CAI_ACTIVITY_SOURCE_QUERY: QueryDslQueryContainer = {
bool: {
must: [
{
term: {
type: 'cases-user-actions',
},
},
{
bool: {
should: [
{
term: {
'cases-user-actions.type': 'severity',
},
},
{
term: {
'cases-user-actions.type': 'delete_case',
},
},
{
term: {
'cases-user-actions.type': 'category',
},
},
{
term: {
'cases-user-actions.type': 'status',
},
},
{
term: {
'cases-user-actions.type': 'tags',
},
},
],
minimum_should_match: 1,
},
},
],
},
};
export const CAI_ACTIVITY_SOURCE_INDEX = ALERTING_CASES_SAVED_OBJECT_INDEX;
export const CAI_ACTIVITY_BACKFILL_TASK_ID = 'cai_activity_backfill_task';
export const CAI_ACTIVITY_SYNCHRONIZATION_TASK_ID = 'cai_cases_activity_synchronization_task';
export const getActivitySynchronizationSourceQuery = (
lastSyncAt: Date
): QueryDslQueryContainer => ({
bool: {
must: [
{
term: {
type: 'cases-user-actions',
},
},
{
range: {
'cases-user-actions.created_at': {
gte: lastSyncAt.toISOString(),
},
},
},
{
bool: {
should: [
{
term: {
'cases-user-actions.type': 'severity',
},
},
{
term: {
'cases-user-actions.type': 'delete_case',
},
},
{
term: {
'cases-user-actions.type': 'category',
},
},
{
term: {
'cases-user-actions.type': 'status',
},
},
{
term: {
'cases-user-actions.type': 'tags',
},
},
],
minimum_should_match: 1,
},
},
],
},
});

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import { AnalyticsIndex } from '../analytics_index';
import {
CAI_ACTIVITY_INDEX_NAME,
CAI_ACTIVITY_INDEX_ALIAS,
CAI_ACTIVITY_INDEX_VERSION,
CAI_ACTIVITY_SOURCE_INDEX,
CAI_ACTIVITY_SOURCE_QUERY,
CAI_ACTIVITY_BACKFILL_TASK_ID,
CAI_ACTIVITY_SYNCHRONIZATION_TASK_ID,
} from './constants';
import { CAI_ACTIVITY_INDEX_MAPPINGS } from './mappings';
import { CAI_ACTIVITY_INDEX_SCRIPT, CAI_ACTIVITY_INDEX_SCRIPT_ID } from './painless_scripts';
import { scheduleCAISynchronizationTask } from '../tasks/synchronization_task';
export const createActivityAnalyticsIndex = ({
esClient,
logger,
isServerless,
taskManager,
}: {
esClient: ElasticsearchClient;
logger: Logger;
isServerless: boolean;
taskManager: TaskManagerStartContract;
}): AnalyticsIndex =>
new AnalyticsIndex({
logger,
esClient,
isServerless,
taskManager,
indexName: CAI_ACTIVITY_INDEX_NAME,
indexAlias: CAI_ACTIVITY_INDEX_ALIAS,
indexVersion: CAI_ACTIVITY_INDEX_VERSION,
mappings: CAI_ACTIVITY_INDEX_MAPPINGS,
painlessScriptId: CAI_ACTIVITY_INDEX_SCRIPT_ID,
painlessScript: CAI_ACTIVITY_INDEX_SCRIPT,
taskId: CAI_ACTIVITY_BACKFILL_TASK_ID,
sourceIndex: CAI_ACTIVITY_SOURCE_INDEX,
sourceQuery: CAI_ACTIVITY_SOURCE_QUERY,
});
export const scheduleActivityAnalyticsSyncTask = ({
taskManager,
logger,
}: {
taskManager: TaskManagerStartContract;
logger: Logger;
}) => {
scheduleCAISynchronizationTask({
taskId: CAI_ACTIVITY_SYNCHRONIZATION_TASK_ID,
sourceIndex: CAI_ACTIVITY_SOURCE_INDEX,
destIndex: CAI_ACTIVITY_INDEX_NAME,
taskManager,
logger,
}).catch((e) => {
logger.error(
`Error scheduling ${CAI_ACTIVITY_SYNCHRONIZATION_TASK_ID} task, received ${e.message}`
);
});
};

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
export const CAI_ACTIVITY_INDEX_MAPPINGS: MappingTypeMapping = {
dynamic: false,
properties: {
'@timestamp': {
type: 'date',
},
case_id: {
type: 'keyword',
},
action: {
type: 'keyword',
},
type: {
type: 'keyword',
},
payload: {
properties: {
status: {
type: 'keyword',
},
tags: {
type: 'keyword',
},
category: {
type: 'keyword',
},
severity: {
type: 'keyword',
},
},
},
created_at: {
type: 'date',
},
created_at_ms: {
type: 'long',
},
created_by: {
properties: {
username: {
type: 'keyword',
},
profile_uid: {
type: 'keyword',
},
full_name: {
type: 'keyword',
},
email: {
type: 'keyword',
},
},
},
owner: {
type: 'keyword',
},
space_ids: {
type: 'keyword',
},
},
};

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { StoredScript } from '@elastic/elasticsearch/lib/api/types';
import { CAI_ACTIVITY_INDEX_VERSION } from './constants';
export const CAI_ACTIVITY_INDEX_SCRIPT_ID = `cai_activity_script_${CAI_ACTIVITY_INDEX_VERSION}`;
export const CAI_ACTIVITY_INDEX_SCRIPT: StoredScript = {
lang: 'painless',
source: `
def source = [:];
source.putAll(ctx._source);
ctx._source.clear();
ctx._source.action = source["cases-user-actions"].action;
ctx._source.type = source["cases-user-actions"].type;
long milliSinceEpoch = new Date().getTime();
Instant instant = Instant.ofEpochMilli(milliSinceEpoch);
ctx._source['@timestamp'] = ZonedDateTime.ofInstant(instant, ZoneId.of('Z'));
ZonedDateTime zdt_created =
ZonedDateTime.parse(source["cases-user-actions"].created_at);
ctx._source.created_at_ms = zdt_created.toInstant().toEpochMilli();
ctx._source.created_at = source["cases-user-actions"].created_at;
if (source["cases-user-actions"].created_by != null) {
ctx._source.created_by = new HashMap();
ctx._source.created_by.full_name = source["cases-user-actions"].created_by.full_name;
ctx._source.created_by.username = source["cases-user-actions"].created_by.username;
ctx._source.created_by.profile_uid = source["cases-user-actions"].created_by.profile_uid;
ctx._source.created_by.email = source["cases-user-actions"].created_by.email;
}
if (source["cases-user-actions"].payload != null) {
ctx._source.payload = new HashMap();
if (source["cases-user-actions"].type == "severity" && source["cases-user-actions"].payload.severity != null) {
ctx._source.payload.severity = source["cases-user-actions"].payload.severity;
}
if (source["cases-user-actions"].type == "category" && source["cases-user-actions"].payload.category != null) {
ctx._source.payload.category = source["cases-user-actions"].payload.category;
}
if (source["cases-user-actions"].type == "status" && source["cases-user-actions"].payload.status != null) {
ctx._source.payload.status = source["cases-user-actions"].payload.status;
}
if (source["cases-user-actions"].type == "tags" && source["cases-user-actions"].payload.tags != null) {
ctx._source.payload.tags = source["cases-user-actions"].payload.tags;
}
}
if (source.references != null) {
for (item in source.references) {
if (item.type == "cases") {
ctx._source.case_id = item.id;
}
}
}
ctx._source.owner = source["cases-user-actions"].owner;
ctx._source.space_ids = source.namespaces;
`,
};

View file

@ -0,0 +1,308 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import { errors as esErrors } from '@elastic/elasticsearch';
import { AnalyticsIndex } from './analytics_index';
import type {
IndicesCreateResponse,
IndicesPutMappingResponse,
MappingTypeMapping,
QueryDslQueryContainer,
StoredScript,
} from '@elastic/elasticsearch/lib/api/types';
import { fullJitterBackoffFactory } from '../common/retry_service/full_jitter_backoff';
import { scheduleCAIBackfillTask } from './tasks/backfill_task';
jest.mock('../common/retry_service/full_jitter_backoff');
jest.mock('./tasks/backfill_task');
const fullJitterBackoffFactoryMock = fullJitterBackoffFactory as jest.Mock;
const scheduleCAIBackfillTaskMock = scheduleCAIBackfillTask as jest.Mock;
describe('AnalyticsIndex', () => {
const logger = loggingSystemMock.createLogger();
const esClient = elasticsearchServiceMock.createElasticsearchClient();
const taskManager = taskManagerMock.createStart();
const isServerless = false;
const indexName = '.test-index-name';
const indexAlias = '.index-name';
const indexVersion = 1;
const painlessScriptId = 'painless_script_id';
const taskId = 'foobar_task_id';
const sourceIndex = '.source-index';
const painlessScript: StoredScript = {
lang: 'painless',
source: 'ctx._source.remove("foobar");',
};
const mappings: MappingTypeMapping = {
dynamic: false,
properties: {
title: {
type: 'keyword',
},
},
};
const mappingsMeta = {
mapping_version: indexVersion,
painless_script_id: painlessScriptId,
};
const sourceQuery: QueryDslQueryContainer = {
term: {
type: 'cases',
},
};
let index: AnalyticsIndex;
// 1ms delay before retrying
const nextBackOff = jest.fn().mockReturnValue(1);
const backOffFactory = {
create: () => ({ nextBackOff }),
};
beforeEach(() => {
jest.clearAllMocks();
fullJitterBackoffFactoryMock.mockReturnValue(backOffFactory);
index = new AnalyticsIndex({
esClient,
logger,
indexName,
indexAlias,
indexVersion,
isServerless,
mappings,
painlessScript,
painlessScriptId,
sourceIndex,
sourceQuery,
taskId,
taskManager,
});
});
it('checks if the index exists', async () => {
await index.upsertIndex();
expect(esClient.indices.exists).toBeCalledWith({ index: indexName });
});
it('creates index if it does not exist', async () => {
esClient.indices.exists.mockResolvedValueOnce(false);
await index.upsertIndex();
expect(esClient.indices.exists).toBeCalledWith({ index: indexName });
expect(esClient.putScript).toBeCalledWith({ id: painlessScriptId, script: painlessScript });
expect(esClient.indices.create).toBeCalledWith({
index: indexName,
timeout: '300s',
mappings: {
...mappings,
_meta: mappingsMeta,
},
aliases: {
[indexAlias]: {
is_write_index: true,
},
},
settings: {
index: {
auto_expand_replicas: '0-1',
mode: 'lookup',
number_of_shards: 1,
refresh_interval: '15s',
},
},
});
expect(scheduleCAIBackfillTaskMock).toHaveBeenCalledWith({
taskId,
sourceIndex,
sourceQuery,
destIndex: indexName,
taskManager,
logger,
});
});
it('updates index if it exists and the mapping has a lower version number', async () => {
esClient.indices.exists.mockResolvedValueOnce(true);
esClient.indices.getMapping.mockResolvedValueOnce({
[indexName]: {
mappings: {
_meta: {
mapping_version: 0, // lower version number
painless_script_id: painlessScriptId,
},
dynamic: mappings.dynamic,
properties: mappings.properties,
},
},
});
await index.upsertIndex();
expect(esClient.indices.exists).toBeCalledWith({ index: indexName });
expect(esClient.indices.getMapping).toBeCalledWith({ index: indexName });
expect(esClient.putScript).toBeCalledWith({ id: painlessScriptId, script: painlessScript });
expect(esClient.indices.putMapping).toBeCalledWith({
index: indexName,
...mappings,
_meta: mappingsMeta,
});
expect(scheduleCAIBackfillTaskMock).toBeCalledWith({
taskId,
sourceIndex,
sourceQuery,
destIndex: indexName,
taskManager,
logger,
});
});
it('does not update index if it exists and the mapping has a higher version number', async () => {
esClient.indices.exists.mockResolvedValueOnce(true);
esClient.indices.getMapping.mockResolvedValueOnce({
[indexName]: {
mappings: {
_meta: {
mapping_version: 10, // higher version number
painless_script_id: painlessScriptId,
},
dynamic: mappings.dynamic,
properties: mappings.properties,
},
},
});
await index.upsertIndex();
expect(esClient.indices.exists).toBeCalledWith({ index: indexName });
expect(esClient.indices.getMapping).toBeCalledWith({ index: indexName });
expect(esClient.putScript).toBeCalledTimes(0);
expect(esClient.indices.putMapping).toBeCalledTimes(0);
expect(scheduleCAIBackfillTaskMock).toBeCalledTimes(0);
expect(logger.debug).toBeCalledWith(
`[${indexName}] Mapping version is up to date. Skipping update.`,
{ tags: ['cai-index-creation', `${indexName}`] }
);
});
it('does not update index if it exists and the mapping has the same version number', async () => {
esClient.indices.exists.mockResolvedValueOnce(true);
esClient.indices.getMapping.mockResolvedValueOnce({ [indexName]: { mappings } });
await index.upsertIndex();
expect(esClient.indices.exists).toBeCalledWith({ index: indexName });
expect(esClient.indices.getMapping).toBeCalledWith({ index: indexName });
expect(esClient.putScript).toBeCalledTimes(0);
expect(esClient.indices.putMapping).toBeCalledTimes(0);
expect(scheduleCAIBackfillTaskMock).toBeCalledTimes(0);
expect(logger.debug).toBeCalledWith(
`[${indexName}] Mapping version is up to date. Skipping update.`,
{ tags: ['cai-index-creation', `${indexName}`] }
);
});
describe('Error handling', () => {
it('retries if the esClient throws a retryable error', async () => {
esClient.indices.exists
.mockRejectedValueOnce(new esErrors.ConnectionError('My retryable error A'))
.mockRejectedValueOnce(new esErrors.TimeoutError('My retryable error B'))
.mockResolvedValue(true);
await index.upsertIndex();
expect(nextBackOff).toBeCalledTimes(2);
expect(esClient.indices.exists).toBeCalledTimes(3);
expect(esClient.indices.exists).toBeCalledWith({ index: indexName });
});
it('retries if the esClient throws a retryable error when creating an index', async () => {
esClient.indices.exists.mockResolvedValue(false);
esClient.indices.create
.mockRejectedValueOnce(new esErrors.ConnectionError('My retryable error A'))
.mockResolvedValue({} as IndicesCreateResponse);
await index.upsertIndex();
expect(nextBackOff).toBeCalledTimes(1);
expect(esClient.indices.exists).toBeCalledWith({ index: indexName });
expect(esClient.putScript).toBeCalledWith({ id: painlessScriptId, script: painlessScript });
expect(esClient.indices.create).toBeCalledTimes(2);
expect(scheduleCAIBackfillTaskMock).toHaveBeenCalledWith({
taskId,
sourceIndex,
sourceQuery,
destIndex: indexName,
taskManager,
logger,
});
});
it('retries if the esClient throws a retryable error when updating an index', async () => {
esClient.indices.exists.mockResolvedValue(true);
esClient.indices.getMapping.mockResolvedValue({
[indexName]: {
mappings: {
_meta: {
mapping_version: 0, // lower version number
painless_script_id: painlessScriptId,
},
dynamic: mappings.dynamic,
properties: mappings.properties,
},
},
});
esClient.indices.putMapping
.mockRejectedValueOnce(new esErrors.ConnectionError('My retryable error A'))
.mockResolvedValue({} as IndicesPutMappingResponse);
await index.upsertIndex();
expect(nextBackOff).toBeCalledTimes(1);
expect(esClient.indices.exists).toBeCalledWith({ index: indexName });
expect(esClient.indices.getMapping).toBeCalledWith({ index: indexName });
expect(esClient.putScript).toBeCalledWith({ id: painlessScriptId, script: painlessScript });
expect(esClient.indices.putMapping).toBeCalledTimes(2);
expect(esClient.indices.putMapping).toBeCalledWith({
index: indexName,
...mappings,
_meta: mappingsMeta,
});
expect(scheduleCAIBackfillTaskMock).toBeCalledWith({
taskId,
sourceIndex,
sourceQuery,
destIndex: indexName,
taskManager,
logger,
});
});
it('does not retry if the eexecution throws a non-retryable error', async () => {
esClient.indices.exists.mockRejectedValue(new Error('My terrible error'));
await expect(index.upsertIndex()).resolves.not.toThrow();
expect(nextBackOff).toBeCalledTimes(0);
// Paths in the algorithm after the error are not called.
expect(esClient.indices.getMapping).not.toHaveBeenCalled();
});
});
});

View file

@ -0,0 +1,254 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { errors as EsErrors } from '@elastic/elasticsearch';
import type {
IndicesIndexSettings,
MappingTypeMapping,
QueryDslQueryContainer,
StoredScript,
} from '@elastic/elasticsearch/lib/api/types';
import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import {
CAI_NUMBER_OF_SHARDS,
CAI_AUTO_EXPAND_REPLICAS,
CAI_REFRESH_INTERVAL,
CAI_INDEX_MODE,
CAI_DEFAULT_TIMEOUT,
} from './constants';
import { fullJitterBackoffFactory } from '../common/retry_service/full_jitter_backoff';
import { scheduleCAIBackfillTask } from './tasks/backfill_task';
import { CasesAnalyticsRetryService } from './retry_service';
interface AnalyticsIndexParams {
esClient: ElasticsearchClient;
logger: Logger;
indexName: string;
indexAlias: string;
indexVersion: number;
isServerless: boolean;
mappings: MappingTypeMapping;
painlessScript: StoredScript;
painlessScriptId: string;
sourceIndex: string;
sourceQuery: QueryDslQueryContainer;
taskId: string;
taskManager: TaskManagerStartContract;
}
interface MappingMeta {
mapping_version: number;
painless_script_id: string;
}
export class AnalyticsIndex {
private readonly logger: Logger;
private readonly indexName: string;
private readonly indexAlias: string;
private readonly indexVersion: number;
private readonly esClient: ElasticsearchClient;
private readonly mappings: MappingTypeMapping;
private readonly indexSettings?: IndicesIndexSettings;
private readonly painlessScriptId: string;
private readonly painlessScript: StoredScript;
private readonly retryService: CasesAnalyticsRetryService;
private readonly taskManager: TaskManagerStartContract;
private readonly taskId: string;
private readonly sourceIndex: string;
private readonly sourceQuery: QueryDslQueryContainer;
constructor({
logger,
esClient,
isServerless,
indexName,
indexAlias,
indexVersion,
mappings,
painlessScriptId,
painlessScript,
taskManager,
taskId,
sourceIndex,
sourceQuery,
}: AnalyticsIndexParams) {
this.logger = logger;
this.esClient = esClient;
this.indexName = indexName;
this.indexAlias = indexAlias;
this.indexVersion = indexVersion;
this.mappings = mappings;
this.mappings._meta = this.getMappingMeta({ indexVersion, painlessScriptId });
this.painlessScriptId = painlessScriptId;
this.painlessScript = painlessScript;
this.taskManager = taskManager;
this.taskId = taskId;
this.sourceIndex = sourceIndex;
this.sourceQuery = sourceQuery;
this.indexSettings = {
// settings are not supported on serverless ES
...(isServerless
? {}
: {
number_of_shards: CAI_NUMBER_OF_SHARDS,
auto_expand_replicas: CAI_AUTO_EXPAND_REPLICAS,
refresh_interval: CAI_REFRESH_INTERVAL,
mode: CAI_INDEX_MODE,
}),
};
/**
* We should wait at least 5ms before retrying and no more that 2sec
*/
const backOffFactory = fullJitterBackoffFactory({ baseDelay: 5, maxBackoffTime: 2000 });
this.retryService = new CasesAnalyticsRetryService(this.logger, backOffFactory);
}
public async upsertIndex() {
try {
await this.retryService.retryWithBackoff(() => this._upsertIndex());
} catch (error) {
// We do not throw because errors should not break execution
this.logger.error(
`[${this.indexName}] Failed to create index. Error message: ${error.message}`
);
}
}
private async _upsertIndex() {
try {
const indexExists = await this.indexExists();
if (!indexExists) {
this.logDebug('Index does not exist. Creating.');
await this.createIndexMapping();
} else {
this.logDebug('Index exists. Checking mapping.');
await this.updateIndexMapping();
}
} catch (error) {
this.handleError('Failed to create the index.', error);
}
}
private async updateIndexMapping() {
try {
const shouldUpdateMapping = await this.shouldUpdateMapping();
if (shouldUpdateMapping) {
await this.updateMapping();
} else {
this.logDebug('Mapping version is up to date. Skipping update.');
}
} catch (error) {
this.handleError('Failed to update the index mapping.', error);
}
}
private async getCurrentMapping() {
return this.esClient.indices.getMapping({
index: this.indexName,
});
}
private async updateMapping() {
this.logDebug(`Updating the painless script.`);
await this.putScript();
this.logDebug(`Updating index mapping.`);
await this.esClient.indices.putMapping({
index: this.indexName,
...this.mappings,
});
this.logDebug(`Scheduling the backfill task.`);
await this.scheduleBackfillTask();
}
private async createIndexMapping() {
this.logDebug(`Creating painless script.`);
await this.putScript();
this.logDebug(`Creating index.`);
await this.esClient.indices.create({
index: this.indexName,
timeout: CAI_DEFAULT_TIMEOUT,
mappings: this.mappings,
settings: {
index: this.indexSettings,
},
aliases: {
[this.indexAlias]: {
is_write_index: true,
},
},
});
this.logDebug(`Scheduling the backfill task.`);
await this.scheduleBackfillTask();
}
private async indexExists(): Promise<boolean> {
this.logDebug(`Checking if index exists.`);
return this.esClient.indices.exists({
index: this.indexName,
});
}
private async shouldUpdateMapping(): Promise<boolean> {
const currentMapping = await this.getCurrentMapping();
return currentMapping[this.indexName].mappings._meta?.mapping_version < this.indexVersion;
}
private async putScript() {
await this.esClient.putScript({
id: this.painlessScriptId,
script: this.painlessScript,
});
}
private getMappingMeta({
indexVersion,
painlessScriptId,
}: {
indexVersion: number;
painlessScriptId: string;
}): MappingMeta {
this.logDebug(
`Construction mapping._meta. Index version: ${indexVersion}. Painless script: ${painlessScriptId}.`
);
return {
mapping_version: indexVersion,
painless_script_id: painlessScriptId,
};
}
public logDebug(message: string) {
this.logger.debug(`[${this.indexName}] ${message}`, {
tags: ['cai-index-creation', this.indexName],
});
}
private handleError(message: string, error: EsErrors.ElasticsearchClientError) {
this.logger.error(`[${this.indexName}] ${message} Error message: ${error.message}`);
throw error;
}
private async scheduleBackfillTask() {
await scheduleCAIBackfillTask({
taskId: this.taskId,
sourceIndex: this.sourceIndex,
sourceQuery: this.sourceQuery,
destIndex: this.indexName,
taskManager: this.taskManager,
logger: this.logger,
});
}
}

View file

@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { ALERTING_CASES_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
export const CAI_ATTACHMENTS_INDEX_NAME = '.internal.cases-attachments';
export const CAI_ATTACHMENTS_INDEX_ALIAS = '.cases-attachments';
export const CAI_ATTACHMENTS_INDEX_VERSION = 1;
export const CAI_ATTACHMENTS_SOURCE_QUERY: QueryDslQueryContainer = {
bool: {
must: [
{
term: {
type: 'cases-comments',
},
},
{
bool: {
should: [
{
term: {
'cases-comments.type': 'externalReference',
},
},
{
term: {
'cases-comments.type': 'alert',
},
},
],
minimum_should_match: 1,
},
},
],
},
};
export const CAI_ATTACHMENTS_SOURCE_INDEX = ALERTING_CASES_SAVED_OBJECT_INDEX;
export const CAI_ATTACHMENTS_BACKFILL_TASK_ID = 'cai_attachments_backfill_task';
export const CAI_ATTACHMENTS_SYNCHRONIZATION_TASK_ID = 'cai_cases_attachments_synchronization_task';
export const getAttachmentsSynchronizationSourceQuery = (
lastSyncAt: Date
): QueryDslQueryContainer => ({
bool: {
must: [
{
term: {
type: 'cases-comments',
},
},
{
bool: {
should: [
{
term: {
'cases-comments.type': 'externalReference',
},
},
{
term: {
'cases-comments.type': 'alert',
},
},
],
minimum_should_match: 1,
},
},
{
bool: {
should: [
{
range: {
'cases-comments.created_at': {
gte: lastSyncAt.toISOString(),
},
},
},
{
range: {
'cases-comments.updated_at': {
gte: lastSyncAt.toISOString(),
},
},
},
],
},
},
],
},
});

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import { AnalyticsIndex } from '../analytics_index';
import {
CAI_ATTACHMENTS_INDEX_NAME,
CAI_ATTACHMENTS_INDEX_ALIAS,
CAI_ATTACHMENTS_INDEX_VERSION,
CAI_ATTACHMENTS_SOURCE_INDEX,
CAI_ATTACHMENTS_SOURCE_QUERY,
CAI_ATTACHMENTS_BACKFILL_TASK_ID,
CAI_ATTACHMENTS_SYNCHRONIZATION_TASK_ID,
} from './constants';
import { CAI_ATTACHMENTS_INDEX_MAPPINGS } from './mappings';
import { CAI_ATTACHMENTS_INDEX_SCRIPT, CAI_ATTACHMENTS_INDEX_SCRIPT_ID } from './painless_scripts';
import { scheduleCAISynchronizationTask } from '../tasks/synchronization_task';
export const createAttachmentsAnalyticsIndex = ({
esClient,
logger,
isServerless,
taskManager,
}: {
esClient: ElasticsearchClient;
logger: Logger;
isServerless: boolean;
taskManager: TaskManagerStartContract;
}): AnalyticsIndex =>
new AnalyticsIndex({
logger,
esClient,
isServerless,
taskManager,
indexName: CAI_ATTACHMENTS_INDEX_NAME,
indexAlias: CAI_ATTACHMENTS_INDEX_ALIAS,
indexVersion: CAI_ATTACHMENTS_INDEX_VERSION,
mappings: CAI_ATTACHMENTS_INDEX_MAPPINGS,
painlessScriptId: CAI_ATTACHMENTS_INDEX_SCRIPT_ID,
painlessScript: CAI_ATTACHMENTS_INDEX_SCRIPT,
taskId: CAI_ATTACHMENTS_BACKFILL_TASK_ID,
sourceIndex: CAI_ATTACHMENTS_SOURCE_INDEX,
sourceQuery: CAI_ATTACHMENTS_SOURCE_QUERY,
});
export const scheduleAttachmentsAnalyticsSyncTask = ({
taskManager,
logger,
}: {
taskManager: TaskManagerStartContract;
logger: Logger;
}) => {
scheduleCAISynchronizationTask({
taskId: CAI_ATTACHMENTS_SYNCHRONIZATION_TASK_ID,
sourceIndex: CAI_ATTACHMENTS_SOURCE_INDEX,
destIndex: CAI_ATTACHMENTS_INDEX_NAME,
taskManager,
logger,
}).catch((e) => {
logger.error(
`Error scheduling ${CAI_ATTACHMENTS_SYNCHRONIZATION_TASK_ID} task, received ${e.message}`
);
});
};

View file

@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
export const CAI_ATTACHMENTS_INDEX_MAPPINGS: MappingTypeMapping = {
dynamic: false,
properties: {
'@timestamp': {
type: 'date',
},
case_id: {
type: 'keyword',
},
type: {
type: 'keyword',
},
created_at: {
type: 'date',
},
created_by: {
properties: {
username: {
type: 'keyword',
},
profile_uid: {
type: 'keyword',
},
full_name: {
type: 'keyword',
},
email: {
type: 'keyword',
},
},
},
payload: {
properties: {
alerts: {
properties: {
id: {
type: 'keyword',
},
index: {
type: 'keyword',
},
},
},
file: {
properties: {
id: {
type: 'keyword',
},
extension: {
type: 'keyword',
},
mimeType: {
type: 'keyword',
},
name: {
type: 'keyword',
},
},
},
},
},
owner: {
type: 'keyword',
},
space_ids: {
type: 'keyword',
},
},
};

View file

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { StoredScript } from '@elastic/elasticsearch/lib/api/types';
import { CAI_ATTACHMENTS_INDEX_VERSION } from './constants';
export const CAI_ATTACHMENTS_INDEX_SCRIPT_ID = `cai_attachments_script_${CAI_ATTACHMENTS_INDEX_VERSION}`;
export const CAI_ATTACHMENTS_INDEX_SCRIPT: StoredScript = {
lang: 'painless',
source: `
def source = [:];
source.putAll(ctx._source);
ctx._source.clear();
if (
(
source["cases-comments"].type == "externalReference" &&
source["cases-comments"].externalReferenceAttachmentTypeId != ".files"
) &&
source["cases-comments"].type != "alert"
) {
ctx.op = "noop";
return;
}
long timestampInMillis = new Date().getTime();
Instant timestampInstance = Instant.ofEpochMilli(timestampInMillis);
ctx._source['@timestamp'] = ZonedDateTime.ofInstant(timestampInstance, ZoneId.of('Z'));
ctx._source.type = source["cases-comments"].type;
if (
ctx._source.type == "alert" &&
source["cases-comments"].alertId != null &&
source["cases-comments"].index != null
) {
ctx._source.payload = new HashMap();
ctx._source.payload.alerts = new ArrayList();
for (int y = 0; y < source["cases-comments"].alertId.size(); y++) {
Map alert = new HashMap();
alert.id = source["cases-comments"].alertId[y];
if ( y < source["cases-comments"].index.size() ) {
alert.index = source["cases-comments"].index[y];
}
ctx._source.payload.alerts.add(alert);
}
}
if (
ctx._source.type == "externalReference" &&
source["cases-comments"].externalReferenceAttachmentTypeId == ".files" &&
source["cases-comments"].externalReferenceMetadata.files.size() > 0
) {
ctx._source.payload = new HashMap();
ctx._source.payload.file = new HashMap();
ctx._source.payload.file.extension = source["cases-comments"].externalReferenceMetadata.files[0].extension;
ctx._source.payload.file.mimeType = source["cases-comments"].externalReferenceMetadata.files[0].mimeType;
ctx._source.payload.file.name = source["cases-comments"].externalReferenceMetadata.files[0].name;
}
if (source.references != null) {
for (item in source.references) {
if (item.type == "file") {
ctx._source.payload.file.id = item.id;
} else if (item.type == "cases") {
ctx._source.case_id = item.id;
}
}
}
ctx._source.owner = source["cases-comments"].owner;
ctx._source.space_ids = source.namespaces;
`,
};

View file

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { ALERTING_CASES_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
export const CAI_CASES_INDEX_NAME = '.internal.cases';
export const CAI_CASES_INDEX_ALIAS = '.cases';
export const CAI_CASES_INDEX_VERSION = 1;
export const CAI_CASES_SOURCE_QUERY: QueryDslQueryContainer = {
term: {
type: 'cases',
},
};
export const CAI_CASES_SOURCE_INDEX = ALERTING_CASES_SAVED_OBJECT_INDEX;
export const CAI_CASES_BACKFILL_TASK_ID = 'cai_cases_backfill_task';
export const CAI_CASES_SYNCHRONIZATION_TASK_ID = 'cai_cases_synchronization_task';
export const getCasesSynchronizationSourceQuery = (lastSyncAt: Date): QueryDslQueryContainer => ({
bool: {
must: [
{
term: {
type: 'cases',
},
},
{
bool: {
should: [
{
range: {
'cases.created_at': {
gte: lastSyncAt.toISOString(),
},
},
},
{
range: {
'cases.updated_at': {
gte: lastSyncAt.toISOString(),
},
},
},
],
},
},
],
},
});

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import { AnalyticsIndex } from '../analytics_index';
import {
CAI_CASES_INDEX_NAME,
CAI_CASES_INDEX_ALIAS,
CAI_CASES_INDEX_VERSION,
CAI_CASES_SOURCE_INDEX,
CAI_CASES_SOURCE_QUERY,
CAI_CASES_BACKFILL_TASK_ID,
CAI_CASES_SYNCHRONIZATION_TASK_ID,
} from './constants';
import { CAI_CASES_INDEX_MAPPINGS } from './mappings';
import { CAI_CASES_INDEX_SCRIPT_ID, CAI_CASES_INDEX_SCRIPT } from './painless_scripts';
import { scheduleCAISynchronizationTask } from '../tasks/synchronization_task';
export const createCasesAnalyticsIndex = ({
esClient,
logger,
isServerless,
taskManager,
}: {
esClient: ElasticsearchClient;
logger: Logger;
isServerless: boolean;
taskManager: TaskManagerStartContract;
}): AnalyticsIndex =>
new AnalyticsIndex({
logger,
esClient,
isServerless,
taskManager,
indexName: CAI_CASES_INDEX_NAME,
indexAlias: CAI_CASES_INDEX_ALIAS,
indexVersion: CAI_CASES_INDEX_VERSION,
mappings: CAI_CASES_INDEX_MAPPINGS,
painlessScriptId: CAI_CASES_INDEX_SCRIPT_ID,
painlessScript: CAI_CASES_INDEX_SCRIPT,
taskId: CAI_CASES_BACKFILL_TASK_ID,
sourceIndex: CAI_CASES_SOURCE_INDEX,
sourceQuery: CAI_CASES_SOURCE_QUERY,
});
export const scheduleCasesAnalyticsSyncTask = ({
taskManager,
logger,
}: {
taskManager: TaskManagerStartContract;
logger: Logger;
}) => {
scheduleCAISynchronizationTask({
taskId: CAI_CASES_SYNCHRONIZATION_TASK_ID,
sourceIndex: CAI_CASES_SOURCE_INDEX,
destIndex: CAI_CASES_INDEX_NAME,
taskManager,
logger,
}).catch((e) => {
logger.error(
`Error scheduling ${CAI_CASES_SYNCHRONIZATION_TASK_ID} task, received ${e.message}`
);
});
};

View file

@ -0,0 +1,163 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
export const CAI_CASES_INDEX_MAPPINGS: MappingTypeMapping = {
dynamic: false,
properties: {
'@timestamp': {
type: 'date',
},
title: {
type: 'text',
fields: {
keyword: {
type: 'keyword',
},
},
},
description: {
type: 'text',
},
tags: {
type: 'keyword',
},
category: {
type: 'keyword',
},
status: {
type: 'keyword',
},
status_sort: {
type: 'short',
},
severity: {
type: 'keyword',
},
severity_sort: {
type: 'short',
},
created_at: {
type: 'date',
},
created_at_ms: {
type: 'long',
},
created_by: {
properties: {
username: {
type: 'keyword',
},
profile_uid: {
type: 'keyword',
},
full_name: {
type: 'keyword',
},
email: {
type: 'keyword',
},
},
},
updated_at: {
type: 'date',
},
updated_at_ms: {
type: 'long',
},
updated_by: {
properties: {
username: {
type: 'keyword',
},
profile_uid: {
type: 'keyword',
},
full_name: {
type: 'keyword',
},
email: {
type: 'keyword',
},
},
},
closed_at: {
type: 'date',
},
closed_at_ms: {
type: 'long',
},
closed_by: {
properties: {
username: {
type: 'keyword',
},
profile_uid: {
type: 'keyword',
},
full_name: {
type: 'keyword',
},
email: {
type: 'keyword',
},
},
},
assignees: {
type: 'keyword',
},
time_to_resolve: {
type: 'long',
},
time_to_acknowledge: {
type: 'long',
},
time_to_investigate: {
type: 'long',
},
custom_fields: {
properties: {
type: {
type: 'keyword',
},
key: {
type: 'keyword',
},
value: {
type: 'keyword',
},
},
},
observables: {
properties: {
type: {
// called typeKey in the cases mapping
type: 'keyword',
},
value: {
type: 'keyword',
},
},
},
total_assignees: {
type: 'integer',
},
owner: {
type: 'keyword',
},
space_ids: {
type: 'keyword',
},
total_alerts: {
type: 'integer',
},
total_comments: {
type: 'integer',
},
},
};

View file

@ -0,0 +1,164 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { StoredScript } from '@elastic/elasticsearch/lib/api/types';
import { CAI_CASES_INDEX_VERSION } from './constants';
export const CAI_CASES_INDEX_SCRIPT_ID = `cai_cases_script_${CAI_CASES_INDEX_VERSION}`;
export const CAI_CASES_INDEX_SCRIPT: StoredScript = {
lang: 'painless',
source: `
String statusDecoder(def x) {
if (x == 0) {
return "open";
}
if (x == 10) {
return "in-progress";
}
if (x == 20) {
return "closed";
}
return "";
}
String severityDecoder(def x) {
if (x == 0) {
return "low"
}
if (x == 10) {
return "medium"
}
if (x == 20) {
return "high"
}
if (x == 30) {
return "critical"
}
return ""
}
def source = [:];
source.putAll(ctx._source);
ctx._source.clear();
long milliSinceEpoch = new Date().getTime();
Instant instant = Instant.ofEpochMilli(milliSinceEpoch);
ctx._source['@timestamp'] = ZonedDateTime.ofInstant(instant, ZoneId.of('Z'));
ctx._source.title = source.cases.title;
ctx._source.description = source.cases.description;
ctx._source.tags = source.cases.tags;
ctx._source.category = source.cases.category;
ctx._source.status_sort = source.cases.status;
ctx._source.status = statusDecoder(ctx._source.status_sort);
ctx._source.severity_sort = source.cases.severity;
ctx._source.severity = severityDecoder(ctx._source.severity_sort);
ZonedDateTime zdt_created =
ZonedDateTime.parse(source.cases.created_at);
ctx._source.created_at_ms = zdt_created.toInstant().toEpochMilli();
ctx._source.created_at = source.cases.created_at;
if (source.cases.created_by != null) {
ctx._source.created_by = new HashMap();
ctx._source.created_by.full_name = source.cases.created_by.full_name;
ctx._source.created_by.username = source.cases.created_by.username;
ctx._source.created_by.profile_uid = source.cases.created_by.profile_uid;
ctx._source.created_by.email = source.cases.created_by.email;
}
if ( source.cases.updated_at != null ) {
ZonedDateTime zdt_updated =
ZonedDateTime.parse(source.cases.updated_at);
ctx._source.updated_at_ms = zdt_updated.toInstant().toEpochMilli();
ctx._source.updated_at = source.cases.updated_at;
}
if (source.cases.updated_by != null) {
ctx._source.updated_by = new HashMap();
ctx._source.updated_by.full_name = source.cases.updated_by.full_name;
ctx._source.updated_by.username = source.cases.updated_by.username;
ctx._source.updated_by.profile_uid = source.cases.updated_by.profile_uid;
ctx._source.updated_by.email = source.cases.updated_by.email;
}
if ( source.cases.closed_at != null ) {
ZonedDateTime zdt_closed =
ZonedDateTime.parse(source.cases.closed_at);
ctx._source.closed_at_ms = zdt_closed.toInstant().toEpochMilli();
ctx._source.closed_at = source.cases.closed_at;
}
if (source.cases.closed_by != null) {
ctx._source.closed_by = new HashMap();
ctx._source.closed_by.full_name = source.cases.closed_by.full_name;
ctx._source.closed_by.username = source.cases.closed_by.username;
ctx._source.closed_by.profile_uid = source.cases.closed_by.profile_uid;
ctx._source.closed_by.email = source.cases.closed_by.email;
}
ctx._source.assignees = [];
if (source.cases.assignees != null) {
for (item in source.cases.assignees) {
ctx._source.assignees.add(item.uid);
}
ctx._source.total_assignees = source.cases.assignees.size();
}
ctx._source.custom_fields = [];
if (source.cases.customFields != null) {
for (item in source.cases.customFields) {
Map customField = new HashMap();
customField.type = item.type;
customField.value = item.value;
customField.key = item.key;
ctx._source.custom_fields.add(customField);
}
}
ctx._source.observables = [];
if (source.cases.observables != null) {
for (item in source.cases.observables) {
Map observable = new HashMap();
observable.label = item.label;
observable.type = item.typeKey;
observable.value = item.value;
ctx._source.observables.add(observable);
}
}
ctx._source.owner = source.cases.owner;
ctx._source.space_ids = source.namespaces;
if (source.cases.time_to_acknowledge != null){
ctx._source.time_to_acknowledge = source.cases.time_to_acknowledge;
}
if (source.cases.time_to_investigate != null){
ctx._source.time_to_investigate = source.cases.time_to_investigate;
}
if (source.cases.time_to_resolve != null){
ctx._source.time_to_resolve = source.cases.time_to_resolve;
}
if (source.cases.total_alerts != null && source.cases.total_alerts >= 0){
ctx._source.total_alerts = source.cases.total_alerts;
}
if (source.cases.total_comments != null && source.cases.total_comments >= 0){
ctx._source.total_comments = source.cases.total_comments;
}
`,
};

View file

@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { ALERTING_CASES_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
export const CAI_COMMENTS_INDEX_NAME = '.internal.cases-comments';
export const CAI_COMMENTS_INDEX_ALIAS = '.cases-comments';
export const CAI_COMMENTS_INDEX_VERSION = 1;
export const CAI_COMMENTS_SOURCE_QUERY: QueryDslQueryContainer = {
bool: {
filter: [
{
term: {
type: 'cases-comments',
},
},
{
term: {
'cases-comments.type': 'user',
},
},
],
},
};
export const CAI_COMMENTS_SOURCE_INDEX = ALERTING_CASES_SAVED_OBJECT_INDEX;
export const CAI_COMMENTS_BACKFILL_TASK_ID = 'cai_comments_backfill_task';
export const CAI_COMMENTS_SYNCHRONIZATION_TASK_ID = 'cai_cases_comments_synchronization_task';
export const getCommentsSynchronizationSourceQuery = (
lastSyncAt: Date
): QueryDslQueryContainer => ({
bool: {
must: [
{
term: {
'cases-comments.type': 'user',
},
},
{
term: {
type: 'cases-comments',
},
},
{
bool: {
should: [
{
range: {
'cases-comments.created_at': {
gte: lastSyncAt.toISOString(),
},
},
},
{
range: {
'cases-comments.updated_at': {
gte: lastSyncAt.toISOString(),
},
},
},
],
},
},
],
},
});

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import { AnalyticsIndex } from '../analytics_index';
import {
CAI_COMMENTS_INDEX_NAME,
CAI_COMMENTS_INDEX_ALIAS,
CAI_COMMENTS_INDEX_VERSION,
CAI_COMMENTS_SOURCE_INDEX,
CAI_COMMENTS_SOURCE_QUERY,
CAI_COMMENTS_BACKFILL_TASK_ID,
CAI_COMMENTS_SYNCHRONIZATION_TASK_ID,
} from './constants';
import { CAI_COMMENTS_INDEX_MAPPINGS } from './mappings';
import { CAI_COMMENTS_INDEX_SCRIPT, CAI_COMMENTS_INDEX_SCRIPT_ID } from './painless_scripts';
import { scheduleCAISynchronizationTask } from '../tasks/synchronization_task';
export const createCommentsAnalyticsIndex = ({
esClient,
logger,
isServerless,
taskManager,
}: {
esClient: ElasticsearchClient;
logger: Logger;
isServerless: boolean;
taskManager: TaskManagerStartContract;
}): AnalyticsIndex =>
new AnalyticsIndex({
logger,
esClient,
isServerless,
taskManager,
indexName: CAI_COMMENTS_INDEX_NAME,
indexAlias: CAI_COMMENTS_INDEX_ALIAS,
indexVersion: CAI_COMMENTS_INDEX_VERSION,
mappings: CAI_COMMENTS_INDEX_MAPPINGS,
painlessScriptId: CAI_COMMENTS_INDEX_SCRIPT_ID,
painlessScript: CAI_COMMENTS_INDEX_SCRIPT,
taskId: CAI_COMMENTS_BACKFILL_TASK_ID,
sourceIndex: CAI_COMMENTS_SOURCE_INDEX,
sourceQuery: CAI_COMMENTS_SOURCE_QUERY,
});
export const scheduleCommentsAnalyticsSyncTask = ({
taskManager,
logger,
}: {
taskManager: TaskManagerStartContract;
logger: Logger;
}) => {
scheduleCAISynchronizationTask({
taskId: CAI_COMMENTS_SYNCHRONIZATION_TASK_ID,
sourceIndex: CAI_COMMENTS_SOURCE_INDEX,
destIndex: CAI_COMMENTS_INDEX_NAME,
taskManager,
logger,
}).catch((e) => {
logger.error(
`Error scheduling ${CAI_COMMENTS_SYNCHRONIZATION_TASK_ID} task, received ${e.message}`
);
});
};

View file

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
export const CAI_COMMENTS_INDEX_MAPPINGS: MappingTypeMapping = {
dynamic: false,
properties: {
'@timestamp': {
type: 'date',
},
case_id: {
type: 'keyword',
},
comment: {
type: 'text',
},
created_at: {
type: 'date',
},
created_by: {
properties: {
username: {
type: 'keyword',
},
profile_uid: {
type: 'keyword',
},
full_name: {
type: 'keyword',
},
email: {
type: 'keyword',
},
},
},
updated_at: {
type: 'date',
},
updated_by: {
properties: {
username: {
type: 'keyword',
},
profile_uid: {
type: 'keyword',
},
full_name: {
type: 'keyword',
},
email: {
type: 'keyword',
},
},
},
owner: {
type: 'keyword',
},
space_ids: {
type: 'keyword',
},
},
};

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { StoredScript } from '@elastic/elasticsearch/lib/api/types';
import { CAI_COMMENTS_INDEX_VERSION } from './constants';
export const CAI_COMMENTS_INDEX_SCRIPT_ID = `cai_comments_script_${CAI_COMMENTS_INDEX_VERSION}`;
export const CAI_COMMENTS_INDEX_SCRIPT: StoredScript = {
lang: 'painless',
source: `
def source = [:];
source.putAll(ctx._source);
ctx._source.clear();
long milliSinceEpoch = new Date().getTime();
Instant instant = Instant.ofEpochMilli(milliSinceEpoch);
ctx._source['@timestamp'] = ZonedDateTime.ofInstant(instant, ZoneId.of('Z'));
ctx._source.comment = source["cases-comments"].comment;
ctx._source.created_at = source["cases-comments"].created_at;
ctx._source.created_by = source["cases-comments"].created_by;
ctx._source.owner = source["cases-comments"].owner;
ctx._source.space_ids = source.namespaces;
if ( source["cases-comments"].updated_at != null ) {
ctx._source.updated_at = source["cases-comments"].updated_at;
}
if (source["cases-comments"].updated_by != null) {
ctx._source.updated_by = new HashMap();
ctx._source.updated_by.full_name = source["cases-comments"].updated_by.full_name;
ctx._source.updated_by.username = source["cases-comments"].updated_by.username;
ctx._source.updated_by.profile_uid = source["cases-comments"].updated_by.profile_uid;
ctx._source.updated_by.email = source["cases-comments"].updated_by.email;
}
if (source.references != null) {
for (item in source.references) {
if (item.type == "cases") {
ctx._source.case_id = item.id;
}
}
}
`,
};

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import {
CAI_ATTACHMENTS_INDEX_NAME,
getAttachmentsSynchronizationSourceQuery,
} from './attachments_index/constants';
import { CAI_CASES_INDEX_NAME, getCasesSynchronizationSourceQuery } from './cases_index/constants';
import {
CAI_COMMENTS_INDEX_NAME,
getCommentsSynchronizationSourceQuery,
} from './comments_index/constants';
import {
CAI_ACTIVITY_INDEX_NAME,
getActivitySynchronizationSourceQuery,
} from './activity_index/constants';
export const CAI_NUMBER_OF_SHARDS = 1;
/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */
export const CAI_AUTO_EXPAND_REPLICAS = '0-1';
export const CAI_REFRESH_INTERVAL = '15s';
export const CAI_INDEX_MODE = 'lookup';
/**
* When a request takes a long time to complete and hits the timeout or the
* client aborts that request due to the requestTimeout, our only course of
* action is to retry that request. This places our request at the end of the
* queue and adds more load to Elasticsearch just making things worse.
*
* So we want to choose as long a timeout as possible. Some load balancers /
* reverse proxies like ELB ignore TCP keep-alive packets so unless there's a
* request or response sent over the socket it will be dropped after 60s.
*/
export const CAI_DEFAULT_TIMEOUT = '300s';
export const SYNCHRONIZATION_QUERIES_DICTIONARY: Record<
string,
(lastSyncAt: Date) => QueryDslQueryContainer
> = {
[CAI_CASES_INDEX_NAME]: getCasesSynchronizationSourceQuery,
[CAI_COMMENTS_INDEX_NAME]: getCommentsSynchronizationSourceQuery,
[CAI_ATTACHMENTS_INDEX_NAME]: getAttachmentsSynchronizationSourceQuery,
[CAI_ACTIVITY_INDEX_NAME]: getActivitySynchronizationSourceQuery,
};

View file

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { CoreSetup, ElasticsearchClient, Logger } from '@kbn/core/server';
import type {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import type { CasesServerStartDependencies } from '../types';
import { registerCAIBackfillTask } from './tasks/backfill_task';
import { registerCAISynchronizationTask } from './tasks/synchronization_task';
import {
createAttachmentsAnalyticsIndex,
scheduleAttachmentsAnalyticsSyncTask,
} from './attachments_index';
import { createCasesAnalyticsIndex, scheduleCasesAnalyticsSyncTask } from './cases_index';
import { createCommentsAnalyticsIndex, scheduleCommentsAnalyticsSyncTask } from './comments_index';
import { createActivityAnalyticsIndex, scheduleActivityAnalyticsSyncTask } from './activity_index';
export const createCasesAnalyticsIndexes = ({
esClient,
logger,
isServerless,
taskManager,
}: {
esClient: ElasticsearchClient;
logger: Logger;
isServerless: boolean;
taskManager: TaskManagerStartContract;
}) => {
const casesIndex = createCasesAnalyticsIndex({
logger,
esClient,
isServerless,
taskManager,
});
const casesAttachmentsIndex = createCommentsAnalyticsIndex({
logger,
esClient,
isServerless,
taskManager,
});
const casesCommentsIndex = createAttachmentsAnalyticsIndex({
logger,
esClient,
isServerless,
taskManager,
});
const casesActivityIndex = createActivityAnalyticsIndex({
logger,
esClient,
isServerless,
taskManager,
});
return Promise.all([
casesIndex.upsertIndex(),
casesAttachmentsIndex.upsertIndex(),
casesCommentsIndex.upsertIndex(),
casesActivityIndex.upsertIndex(),
]);
};
export const registerCasesAnalyticsIndexesTasks = ({
taskManager,
logger,
core,
}: {
taskManager: TaskManagerSetupContract;
logger: Logger;
core: CoreSetup<CasesServerStartDependencies>;
}) => {
registerCAIBackfillTask({ taskManager, logger, core });
registerCAISynchronizationTask({ taskManager, logger, core });
};
export const scheduleCasesAnalyticsSyncTasks = ({
taskManager,
logger,
}: {
taskManager: TaskManagerStartContract;
logger: Logger;
}) => {
scheduleActivityAnalyticsSyncTask({ taskManager, logger });
scheduleCasesAnalyticsSyncTask({ taskManager, logger });
scheduleCommentsAnalyticsSyncTask({ taskManager, logger });
scheduleAttachmentsAnalyticsSyncTask({ taskManager, logger });
};

View file

@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { loggingSystemMock } from '@kbn/core-logging-browser-mocks';
import type { Logger } from '@kbn/core/server';
import { errors as EsErrors } from '@elastic/elasticsearch';
import { CasesAnalyticsRetryService } from './cases_analytics_retry_service';
import type { BackoffFactory } from '../../common/retry_service/types';
describe('CasesAnalyticsRetryService', () => {
const nextBackOff = jest.fn();
const cb = jest.fn();
const retryableError = new EsErrors.ConnectionError('My retryable error');
const backOffFactory: BackoffFactory = {
create: () => ({ nextBackOff }),
};
const mockLogger = loggingSystemMock.create().get() as jest.Mocked<Logger>;
let service: CasesAnalyticsRetryService;
beforeEach(() => {
jest.clearAllMocks();
nextBackOff.mockReturnValue(1);
service = new CasesAnalyticsRetryService(mockLogger, backOffFactory);
});
it('should not retry if the error is not a retryable ElasticsearchClientError', async () => {
cb.mockRejectedValue(new Error('My error'));
await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot(
`"My error"`
);
expect(cb).toBeCalledTimes(1);
expect(nextBackOff).not.toBeCalled();
});
it('should not retry after trying more than the max attempts', async () => {
const maxAttempts = 3;
service = new CasesAnalyticsRetryService(mockLogger, backOffFactory, maxAttempts);
cb.mockRejectedValue(retryableError);
await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot(
`"My retryable error"`
);
expect(cb).toBeCalledTimes(maxAttempts + 1);
expect(nextBackOff).toBeCalledTimes(maxAttempts);
});
it('should succeed if cb does not throw', async () => {
service = new CasesAnalyticsRetryService(mockLogger, backOffFactory);
cb.mockResolvedValue({ status: 'ok' });
const res = await service.retryWithBackoff(cb);
expect(nextBackOff).toBeCalledTimes(0);
expect(cb).toBeCalledTimes(1);
expect(res).toEqual({ status: 'ok' });
});
describe('Logging', () => {
it('should log a warning when retrying', async () => {
service = new CasesAnalyticsRetryService(mockLogger, backOffFactory, 2);
cb.mockRejectedValue(retryableError);
await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot(
`"My retryable error"`
);
expect(mockLogger.warn).toBeCalledTimes(2);
expect(mockLogger.warn).toHaveBeenNthCalledWith(
1,
'[CasesAnalytics][retryWithBackoff] Failed with error "My retryable error". Attempt for retry: 1'
);
expect(mockLogger.warn).toHaveBeenNthCalledWith(
2,
'[CasesAnalytics][retryWithBackoff] Failed with error "My retryable error". Attempt for retry: 2'
);
});
it('should not log a warning when the error is not supported', async () => {
cb.mockRejectedValue(new Error('My error'));
await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot(
`"My error"`
);
expect(mockLogger.warn).not.toBeCalled();
});
});
});

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/core/server';
import type { errors as EsErrors } from '@elastic/elasticsearch';
import { isRetryableEsClientError } from '../utils';
import type { BackoffFactory } from '../../common/retry_service/types';
import { RetryService } from '../../common/retry_service';
export class CasesAnalyticsRetryService extends RetryService {
constructor(logger: Logger, backOffFactory: BackoffFactory, maxAttempts: number = 10) {
super(logger, backOffFactory, 'CasesAnalytics', maxAttempts);
}
protected isRetryableError(error: EsErrors.ElasticsearchClientError) {
if (isRetryableEsClientError(error)) {
return true;
}
this.logger.debug(`[${this.serviceName}][isRetryableError] Error is not retryable`, {
tags: ['cai:retry-error'],
});
return false;
}
}

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { CasesAnalyticsRetryService } from './cases_analytics_retry_service';

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/logging';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { RunContext } from '@kbn/task-manager-plugin/server';
import { BackfillTaskRunner } from './backfill_task_runner';
interface CaseAnalyticsIndexBackfillTaskFactoryParams {
logger: Logger;
getESClient: () => Promise<ElasticsearchClient>;
}
export class CaseAnalyticsIndexBackfillTaskFactory {
private readonly logger: Logger;
private readonly getESClient: () => Promise<ElasticsearchClient>;
constructor({ logger, getESClient }: CaseAnalyticsIndexBackfillTaskFactoryParams) {
this.logger = logger;
this.getESClient = getESClient;
}
public create(context: RunContext) {
return new BackfillTaskRunner({
taskInstance: context.taskInstance,
logger: this.logger,
getESClient: this.getESClient,
});
}
}

View file

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { errors as esErrors } from '@elastic/elasticsearch';
import { BackfillTaskRunner } from './backfill_task_runner';
import type { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import { isRetryableError } from '@kbn/task-manager-plugin/server/task_running';
describe('BackfillTaskRunner', () => {
const logger = loggingSystemMock.createLogger();
const sourceIndex = '.source-index';
const destIndex = '.dest-index';
const sourceQuery = 'source-query';
const taskInstance = {
params: {
sourceIndex,
destIndex,
sourceQuery,
},
} as unknown as ConcreteTaskInstance;
let taskRunner: BackfillTaskRunner;
beforeEach(() => {
jest.clearAllMocks();
});
it('reindexes as expected', async () => {
const esClient = elasticsearchServiceMock.createElasticsearchClient();
const painlessScriptId = 'painlessScriptId';
const painlessScript = {
lang: 'painless',
source: 'ctx._source.remove("foobar");',
};
esClient.indices.getMapping.mockResolvedValue({
[destIndex]: {
mappings: {
_meta: {
painless_script_id: painlessScriptId,
},
},
},
});
esClient.getScript.mockResolvedValueOnce({
found: true,
_id: painlessScriptId,
script: painlessScript,
});
const getESClient = async () => esClient;
taskRunner = new BackfillTaskRunner({
logger,
getESClient,
taskInstance,
});
const result = await taskRunner.run();
expect(esClient.cluster.health).toBeCalledWith({
index: destIndex,
wait_for_status: 'green',
timeout: '300ms',
wait_for_active_shards: 'all',
});
expect(esClient.indices.getMapping).toBeCalledWith({ index: destIndex });
expect(esClient.getScript).toBeCalledWith({ id: painlessScriptId });
expect(esClient.reindex).toBeCalledWith({
source: {
index: sourceIndex,
query: sourceQuery,
},
dest: { index: destIndex },
script: {
id: painlessScriptId,
},
refresh: true,
wait_for_completion: false,
});
expect(result).toEqual({ state: {} });
});
describe('Error handling', () => {
it('calls throwRetryableError if the esClient throws a retryable error', async () => {
const esClient = elasticsearchServiceMock.createElasticsearchClient();
esClient.cluster.health.mockRejectedValueOnce(
new esErrors.ConnectionError('My retryable error')
);
const getESClient = async () => esClient;
taskRunner = new BackfillTaskRunner({
logger,
getESClient,
taskInstance,
});
try {
await taskRunner.run();
} catch (e) {
expect(isRetryableError(e)).toBe(true);
}
expect(esClient.cluster.health).toBeCalledWith({
index: destIndex,
wait_for_status: 'green',
timeout: '300ms',
wait_for_active_shards: 'all',
});
expect(logger.error).toBeCalledWith(
'[.dest-index] Backfill reindex failed. Error: My retryable error',
{ tags: ['cai-backfill', 'cai-backfill-error', '.dest-index'] }
);
});
it('calls throwUnrecoverableError if execution throws a non-retryable error', async () => {
const esClient = elasticsearchServiceMock.createElasticsearchClient();
esClient.cluster.health.mockRejectedValueOnce(new Error('My unrecoverable error'));
const getESClient = async () => esClient;
taskRunner = new BackfillTaskRunner({
logger,
getESClient,
taskInstance,
});
try {
await taskRunner.run();
} catch (e) {
expect(isRetryableError(e)).toBe(null);
}
expect(logger.error).toBeCalledWith(
'[.dest-index] Backfill reindex failed. Error: My unrecoverable error',
{ tags: ['cai-backfill', 'cai-backfill-error', '.dest-index'] }
);
});
});
});

View file

@ -0,0 +1,160 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/logging';
import {
createTaskRunError,
TaskErrorSource,
throwRetryableError,
throwUnrecoverableError,
type ConcreteTaskInstance,
} from '@kbn/task-manager-plugin/server';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { CancellableTask } from '@kbn/task-manager-plugin/server/task';
import type {
IndicesGetMappingResponse,
QueryDslQueryContainer,
} from '@elastic/elasticsearch/lib/api/types';
import { isRetryableEsClientError } from '../../utils';
interface BackfillTaskRunnerFactoryConstructorParams {
taskInstance: ConcreteTaskInstance;
getESClient: () => Promise<ElasticsearchClient>;
logger: Logger;
}
export class BackfillTaskRunner implements CancellableTask {
private readonly sourceIndex: string;
private readonly destIndex: string;
private readonly sourceQuery: QueryDslQueryContainer;
private readonly getESClient: () => Promise<ElasticsearchClient>;
private readonly logger: Logger;
private readonly errorSource = TaskErrorSource.FRAMEWORK;
constructor({ taskInstance, getESClient, logger }: BackfillTaskRunnerFactoryConstructorParams) {
this.sourceIndex = taskInstance.params.sourceIndex;
this.destIndex = taskInstance.params.destIndex;
this.sourceQuery = taskInstance.params.sourceQuery;
this.getESClient = getESClient;
this.logger = logger;
}
public async run() {
const esClient = await this.getESClient();
try {
await this.waitForDestIndex(esClient);
await this.backfillReindex(esClient);
return {
// one time only tasks get deleted so this state is not enough
// for the periodic tasks to know the backfill was complete
state: {}, // ?
};
} catch (e) {
if (isRetryableEsClientError(e)) {
throwRetryableError(
createTaskRunError(new Error(this.getErrorMessage(e.message)), this.errorSource),
true
);
}
this.logger.error(`[${this.destIndex}] Backfill reindex failed. Error: ${e.message}`, {
tags: ['cai-backfill', 'cai-backfill-error', this.destIndex],
});
throwUnrecoverableError(createTaskRunError(e, this.errorSource));
}
}
public async cancel() {}
private async backfillReindex(esClient: ElasticsearchClient) {
const painlessScript = await this.getPainlessScript(esClient);
if (painlessScript.found) {
this.logDebug(`Reindexing from ${this.sourceIndex} to ${this.destIndex}.`);
const painlessScriptId = await this.getPainlessScriptId(esClient);
await esClient.reindex({
source: {
index: this.sourceIndex,
query: this.sourceQuery,
},
dest: { index: this.destIndex },
script: {
id: painlessScriptId,
},
/** If `true`, the request refreshes affected shards to make this operation visible to search. */
refresh: true,
/** We do not wait for the es reindex operation to be completed. */
wait_for_completion: false,
});
} else {
throw createTaskRunError(
new Error(this.getErrorMessage('Painless script not found.')),
this.errorSource
);
}
}
private async getPainlessScript(esClient: ElasticsearchClient) {
const painlessScriptId = await this.getPainlessScriptId(esClient);
this.logDebug(`Getting painless script with id ${painlessScriptId}.`);
return esClient.getScript({
id: painlessScriptId,
});
}
private async getPainlessScriptId(esClient: ElasticsearchClient): Promise<string> {
const currentMapping = await this.getCurrentMapping(esClient);
const painlessScriptId = currentMapping[this.destIndex].mappings._meta?.painless_script_id;
if (!painlessScriptId) {
throw createTaskRunError(
new Error(this.getErrorMessage('Painless script id missing from mapping.')),
this.errorSource
);
}
return painlessScriptId;
}
private async getCurrentMapping(
esClient: ElasticsearchClient
): Promise<IndicesGetMappingResponse> {
this.logDebug('Getting index mapping.');
return esClient.indices.getMapping({
index: this.destIndex,
});
}
private async waitForDestIndex(esClient: ElasticsearchClient) {
this.logDebug('Checking index availability.');
return esClient.cluster.health({
index: this.destIndex,
wait_for_status: 'green',
timeout: '300ms', // this is probably too much
wait_for_active_shards: 'all',
});
}
public logDebug(message: string) {
this.logger.debug(`[${this.destIndex}] ${message}`, {
tags: ['cai-backfill', this.destIndex],
});
}
public getErrorMessage(message: string) {
const errorMessage = `[${this.destIndex}] Backfill reindex failed. Error: ${message}`;
this.logger.error(errorMessage, {
tags: ['cai-backfill', 'cai-backfill-error', this.destIndex],
});
return errorMessage;
}
}

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const TASK_TYPE = 'cai:cases_analytics_index_backfill';
export const BACKFILL_RUN_AT = 60 * 1000; // milliseconds

View file

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/logging';
import type {
RunContext,
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import type { CoreSetup, ElasticsearchClient } from '@kbn/core/server';
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import type { CasesServerStartDependencies } from '../../../types';
import { CaseAnalyticsIndexBackfillTaskFactory } from './backfill_task_factory';
import { TASK_TYPE, BACKFILL_RUN_AT } from './constants';
export function registerCAIBackfillTask({
taskManager,
logger,
core,
}: {
taskManager: TaskManagerSetupContract;
logger: Logger;
core: CoreSetup<CasesServerStartDependencies>;
}) {
const getESClient = async (): Promise<ElasticsearchClient> => {
const [{ elasticsearch }] = await core.getStartServices();
return elasticsearch.client.asInternalUser;
};
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: 'Backfill cases analytics indexes.',
maxAttempts: 3,
createTaskRunner: (context: RunContext) => {
return new CaseAnalyticsIndexBackfillTaskFactory({ getESClient, logger }).create(context);
},
},
});
}
export async function scheduleCAIBackfillTask({
taskId,
sourceIndex,
sourceQuery,
destIndex,
taskManager,
logger,
}: {
taskId: string;
sourceIndex: string;
sourceQuery: QueryDslQueryContainer;
destIndex: string;
taskManager: TaskManagerStartContract;
logger: Logger;
}) {
try {
await taskManager.ensureScheduled({
id: taskId,
taskType: TASK_TYPE,
params: { sourceIndex, destIndex, sourceQuery },
runAt: new Date(Date.now() + BACKFILL_RUN_AT), // todo, value is short for testing but should run after 5 minutes
state: {},
});
} catch (e) {
logger.error(`Error scheduling ${taskId} task, received ${e.message}`);
}
}

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/logging';
import type {
IntervalSchedule,
RunContext,
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import type { CoreSetup, ElasticsearchClient } from '@kbn/core/server';
import type { CasesServerStartDependencies } from '../../../types';
import { AnalyticsIndexSynchronizationTaskFactory } from './synchronization_task_factory';
const TASK_TYPE = 'cai:cases_analytics_index_synchronization';
const SCHEDULE: IntervalSchedule = { interval: '5m' };
export function registerCAISynchronizationTask({
taskManager,
logger,
core,
}: {
taskManager: TaskManagerSetupContract;
logger: Logger;
core: CoreSetup<CasesServerStartDependencies>;
}) {
const getESClient = async (): Promise<ElasticsearchClient> => {
const [{ elasticsearch }] = await core.getStartServices();
return elasticsearch.client.asInternalUser;
};
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: 'Synchronization for the cases analytics index',
createTaskRunner: (context: RunContext) => {
return new AnalyticsIndexSynchronizationTaskFactory({ getESClient, logger }).create(
context
);
},
},
});
}
/**
* @param {destIndex} string Should be a key of SYNCHRONIZATION_QUERIES_DICTIONARY
*/
export async function scheduleCAISynchronizationTask({
taskId,
sourceIndex,
destIndex,
taskManager,
logger,
}: {
taskId: string;
sourceIndex: string;
destIndex: string;
taskManager: TaskManagerStartContract;
logger: Logger;
}) {
try {
await taskManager.ensureScheduled({
id: taskId,
taskType: TASK_TYPE,
params: { sourceIndex, destIndex },
schedule: SCHEDULE, // every 5 minutes
state: {},
});
} catch (e) {
logger.error(`Error scheduling ${taskId} task, received ${e.message}`);
}
}

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/logging';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { RunContext } from '@kbn/task-manager-plugin/server';
import { SynchronizationTaskRunner } from './synchronization_task_runner';
interface AnalyticsIndexSynchronizationTaskFactoryParams {
logger: Logger;
getESClient: () => Promise<ElasticsearchClient>;
}
export class AnalyticsIndexSynchronizationTaskFactory {
private readonly logger: Logger;
private readonly getESClient: () => Promise<ElasticsearchClient>;
constructor({ logger, getESClient }: AnalyticsIndexSynchronizationTaskFactoryParams) {
this.logger = logger;
this.getESClient = getESClient;
}
public create(context: RunContext) {
return new SynchronizationTaskRunner({
taskInstance: context.taskInstance,
logger: this.logger,
getESClient: this.getESClient,
});
}
}

View file

@ -0,0 +1,387 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import type { TasksTaskInfo } from '@elastic/elasticsearch/lib/api/types';
import { errors as esErrors } from '@elastic/elasticsearch';
import { SynchronizationTaskRunner } from './synchronization_task_runner';
import type { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import { isRetryableError } from '@kbn/task-manager-plugin/server/task_running';
import { CAI_CASES_INDEX_NAME } from '../../cases_index/constants';
describe('SynchronizationTaskRunner', () => {
const logger = loggingSystemMock.createLogger();
const esClient = elasticsearchServiceMock.createElasticsearchClient();
const sourceIndex = '.source-index';
const destIndex = CAI_CASES_INDEX_NAME;
const painlessScriptId = 'painlessScriptId';
const painlessScript = {
lang: 'painless',
source: 'ctx._source.remove("foobar");',
};
const lastSyncSuccess = new Date('2025-06-10T09:25:00.000Z');
const lastSyncAttempt = new Date('2025-06-10T09:30:00.000Z');
const newAttemptTime = new Date('2025-06-10T09:40:00.000Z');
const esReindexTaskId = 'foobar';
const taskInstance = {
params: {
sourceIndex,
destIndex,
},
state: {
lastSyncSuccess,
lastSyncAttempt,
esReindexTaskId,
},
} as unknown as ConcreteTaskInstance;
let taskRunner: SynchronizationTaskRunner;
beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers().setSystemTime(newAttemptTime);
esClient.indices.getMapping.mockResolvedValue({
[destIndex]: {
mappings: {
_meta: {
painless_script_id: painlessScriptId,
},
},
},
});
esClient.getScript.mockResolvedValue({
found: true,
_id: painlessScriptId,
script: painlessScript,
});
esClient.reindex.mockResolvedValue({
task: esReindexTaskId,
});
});
afterAll(() => {
jest.useRealTimers();
});
it('reindexes when the previous sync task is completed and the index is available', async () => {
esClient.tasks.get.mockResolvedValueOnce({
completed: true,
task: {} as TasksTaskInfo,
});
const getESClient = async () => esClient;
taskRunner = new SynchronizationTaskRunner({
logger,
getESClient,
taskInstance,
});
const result = await taskRunner.run();
expect(esClient.tasks.get).toBeCalledWith({ task_id: esReindexTaskId });
expect(esClient.cluster.health).toBeCalledWith({
index: destIndex,
wait_for_status: 'green',
timeout: '300ms',
wait_for_active_shards: 'all',
});
expect(esClient.indices.getMapping).toBeCalledWith({ index: destIndex });
expect(esClient.getScript).toBeCalledWith({ id: painlessScriptId });
expect(esClient.reindex).toBeCalledWith({
source: {
index: sourceIndex,
/*
* The previous attempt was successful so we will reindex with
* a new time.
*
* SYNCHRONIZATION_QUERIES_DICTIONARY[destIndex](lastSyncAttempt)
*/
query: {
bool: {
must: [
{
term: {
type: 'cases',
},
},
{
bool: {
should: [
{
range: {
'cases.created_at': {
gte: lastSyncAttempt.toISOString(),
},
},
},
{
range: {
'cases.updated_at': {
gte: lastSyncAttempt.toISOString(),
},
},
},
],
},
},
],
},
},
},
dest: { index: destIndex },
script: {
id: painlessScriptId,
},
refresh: true,
wait_for_completion: false,
});
expect(result).toEqual({
state: {
// because the previous sync task was completed lastSyncSuccess is now lastSyncAttempt
lastSyncSuccess: lastSyncAttempt,
// we set a new value for lastSyncAttempt
lastSyncAttempt: newAttemptTime,
esReindexTaskId,
},
});
});
it('reindexes using the lookback window when there is no previous sync task and the index is available', async () => {
/*
* If lastSyncSuccess is missing we reindex only SOs that were
* created/updated in the last 5 minutes.
*/
const expectedSyncTime = new Date(newAttemptTime.getTime() - 5 * 60 * 1000);
const getESClient = async () => esClient;
taskRunner = new SynchronizationTaskRunner({
logger,
getESClient,
taskInstance: {
...taskInstance,
state: {},
},
});
const result = await taskRunner.run();
expect(esClient.reindex).toBeCalledWith({
source: {
index: sourceIndex,
/*
* The previous attempt was successful so we will reindex with
* a new time.
*
* SYNCHRONIZATION_QUERIES_DICTIONARY[destIndex](lastSyncAttempt)
*/
query: {
bool: {
must: [
{
term: {
type: 'cases',
},
},
{
bool: {
should: [
{
range: {
'cases.created_at': {
gte: expectedSyncTime.toISOString(),
},
},
},
{
range: {
'cases.updated_at': {
gte: expectedSyncTime.toISOString(),
},
},
},
],
},
},
],
},
},
},
dest: { index: destIndex },
script: {
id: painlessScriptId,
},
refresh: true,
wait_for_completion: false,
});
expect(result).toEqual({
state: {
lastSyncSuccess: undefined,
lastSyncAttempt: newAttemptTime,
esReindexTaskId,
},
});
});
it('returns the previous state if the previous task is still running', async () => {
esClient.tasks.get.mockResolvedValueOnce({
completed: false,
task: {} as TasksTaskInfo,
});
const getESClient = async () => esClient;
taskRunner = new SynchronizationTaskRunner({
logger,
getESClient,
taskInstance,
});
const result = await taskRunner.run();
expect(esClient.reindex).not.toBeCalled();
expect(result).toEqual({
state: taskInstance.state,
});
});
it('reindexes when the previous sync task failed', async () => {
esClient.tasks.get.mockResolvedValueOnce({
completed: true,
task: {} as TasksTaskInfo,
error: { type: 'reindex_error', reason: 'Reindex failed' },
});
const getESClient = async () => esClient;
taskRunner = new SynchronizationTaskRunner({
logger,
getESClient,
taskInstance,
});
const result = await taskRunner.run();
expect(esClient.reindex).toBeCalledWith({
source: {
index: sourceIndex,
/*
* The previous attempt was unsuccessful so we will reindex with
* the old lastSyncSuccess. And updated the attempt time.
*
* SYNCHRONIZATION_QUERIES_DICTIONARY[destIndex](lastSyncSuccess)
*/
query: {
bool: {
must: [
{
term: {
type: 'cases',
},
},
{
bool: {
should: [
{
range: {
'cases.created_at': {
gte: lastSyncSuccess.toISOString(),
},
},
},
{
range: {
'cases.updated_at': {
gte: lastSyncSuccess.toISOString(),
},
},
},
],
},
},
],
},
},
},
dest: { index: destIndex },
script: {
id: painlessScriptId,
},
refresh: true,
wait_for_completion: false,
});
expect(result).toEqual({
state: {
// because the previous sync task failed we do not update this value
lastSyncSuccess,
// we set a new value for lastSyncAttempt
lastSyncAttempt: newAttemptTime,
esReindexTaskId,
},
});
});
describe('Error handling', () => {
it('calls throwRetryableError if the esClient throws a retryable error', async () => {
esClient.tasks.get.mockRejectedValueOnce(new esErrors.ConnectionError('My retryable error'));
const getESClient = async () => esClient;
taskRunner = new SynchronizationTaskRunner({
logger,
getESClient,
taskInstance,
});
try {
await taskRunner.run();
} catch (e) {
expect(isRetryableError(e)).toBe(true);
}
expect(logger.error).toBeCalledWith(
'[.internal.cases] Synchronization reindex failed. Error: My retryable error',
{ tags: ['cai-synchronization', 'cai-synchronization-error', '.internal.cases'] }
);
});
it('calls throwUnrecoverableError if execution throws a non-retryable error', async () => {
esClient.tasks.get.mockRejectedValueOnce(new Error('My unrecoverable error'));
const getESClient = async () => esClient;
taskRunner = new SynchronizationTaskRunner({
logger,
getESClient,
taskInstance,
});
try {
await taskRunner.run();
} catch (e) {
expect(isRetryableError(e)).toBe(null);
}
expect(logger.error).toBeCalledWith(
'[.internal.cases] Synchronization reindex failed. Error: My unrecoverable error',
{ tags: ['cai-synchronization', 'cai-synchronization-error', '.internal.cases'] }
);
});
});
});

View file

@ -0,0 +1,300 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/logging';
import {
createTaskRunError,
TaskErrorSource,
throwRetryableError,
throwUnrecoverableError,
type ConcreteTaskInstance,
} from '@kbn/task-manager-plugin/server';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { CancellableTask } from '@kbn/task-manager-plugin/server/task';
import type {
IndicesGetMappingResponse,
QueryDslQueryContainer,
} from '@elastic/elasticsearch/lib/api/types';
import { isRetryableEsClientError } from '../../utils';
import { SYNCHRONIZATION_QUERIES_DICTIONARY } from '../../constants';
interface SynchronizationTaskRunnerFactoryConstructorParams {
taskInstance: ConcreteTaskInstance;
getESClient: () => Promise<ElasticsearchClient>;
logger: Logger;
}
interface SynchronizationTaskState {
lastSyncSuccess?: Date | undefined;
lastSyncAttempt?: Date | undefined;
esReindexTaskId?: TaskId | undefined;
}
enum ReindexStatus {
RUNNING = 'running',
COMPLETED = 'completed',
FAILED = 'failed',
MISSING_TASK_ID = 'missing_task_id',
}
const LOOKBACK_WINDOW = 5 * 60 * 1000;
export class SynchronizationTaskRunner implements CancellableTask {
private readonly sourceIndex: string;
private readonly destIndex: string;
private readonly getESClient: () => Promise<ElasticsearchClient>;
private readonly logger: Logger;
private readonly errorSource = TaskErrorSource.FRAMEWORK;
private readonly esReindexTaskId: TaskId | undefined;
private lastSyncSuccess: Date | undefined;
private lastSyncAttempt: Date | undefined;
constructor({
taskInstance,
getESClient,
logger,
}: SynchronizationTaskRunnerFactoryConstructorParams) {
if (taskInstance.state.lastSyncSuccess)
this.lastSyncSuccess = new Date(taskInstance.state.lastSyncSuccess);
if (taskInstance.state.lastSyncAttempt)
this.lastSyncAttempt = new Date(taskInstance.state.lastSyncAttempt);
this.esReindexTaskId = taskInstance.state.esReindexTaskId;
this.sourceIndex = taskInstance.params.sourceIndex;
this.destIndex = taskInstance.params.destIndex;
this.getESClient = getESClient;
this.logger = logger;
}
public async run() {
const esClient = await this.getESClient();
try {
const previousReindexStatus = await this.getPreviousReindexStatus(esClient);
this.logDebug(`Previous synchronization task status: "${previousReindexStatus}".`);
if (previousReindexStatus === ReindexStatus.RUNNING) {
/*
* If the reindex task is still running we return the
* same state and the next run will cover any missing
* updates.
**/
return {
state: this.getSyncState() as Record<string, unknown>,
};
}
if (previousReindexStatus === ReindexStatus.COMPLETED) {
/*
* If the previous reindex task is completed we reindex
* with a new time window.
**/
await this.isIndexAvailable(esClient);
this.updateLastSyncTimes({ updateSuccessTime: true });
const esReindexTaskId = await this.synchronizeIndex({ esClient });
return {
state: {
lastSyncSuccess: this.lastSyncSuccess,
lastSyncAttempt: this.lastSyncAttempt,
esReindexTaskId,
} as Record<string, unknown>,
};
}
if (
/*
* A missing task id can only happen if this is
* the first task execution.
**/
previousReindexStatus === ReindexStatus.MISSING_TASK_ID ||
previousReindexStatus === ReindexStatus.FAILED
) {
await this.isIndexAvailable(esClient);
/*
* There are two possible scenarios here:
* 1. If the previous task failed (ReindexStatus.FAILED)
* 2. If the state is missing
*
* In both cases we try to reindex without updating lastSyncSuccess.
* This will ensure the reindex query is built with the correct value.
**/
this.updateLastSyncTimes({ updateSuccessTime: false });
const esReindexTaskId = await this.synchronizeIndex({ esClient });
return {
state: {
lastSyncSuccess: this.lastSyncSuccess,
lastSyncAttempt: this.lastSyncAttempt,
esReindexTaskId,
} as Record<string, unknown>,
};
}
throw new Error('Invalid task state.');
} catch (e) {
if (isRetryableEsClientError(e)) {
throwRetryableError(
createTaskRunError(new Error(this.handleErrorMessage(e.message)), this.errorSource),
true
);
}
this.handleErrorMessage(e.message);
throwUnrecoverableError(createTaskRunError(e, this.errorSource));
}
}
private updateLastSyncTimes({ updateSuccessTime }: { updateSuccessTime?: boolean }) {
this.logDebug('Updating lastSyncAttempt and lastSyncSuccess before synchronization.');
if (updateSuccessTime) {
this.lastSyncSuccess = this.lastSyncAttempt;
}
this.lastSyncAttempt = new Date();
}
/**
* This method does a call to elasticsearch that reindexes from this.destIndex
* to this.sourceIndex. The query used takes into account the lastSyncSuccess
* and lastSyncAttempt values in the class.
*
* @returns {SynchronizationTaskState} The updated task state
*/
private async synchronizeIndex({
esClient,
}: {
esClient: ElasticsearchClient;
}): Promise<TaskId | undefined> {
const painlessScript = await this.getPainlessScript(esClient);
if (painlessScript.found) {
this.logDebug(`Synchronizing with ${this.sourceIndex}.`);
const sourceQuery = this.buildSourceQuery();
const reindexResponse = await esClient.reindex({
source: {
index: this.sourceIndex,
query: sourceQuery,
},
dest: { index: this.destIndex },
script: {
id: painlessScript._id,
},
/** If `true`, the request refreshes affected shards to make this operation visible to search. */
refresh: true,
/** We do not wait for the es reindex operation to be completed. */
wait_for_completion: false,
});
return reindexResponse.task;
} else {
throw createTaskRunError(
new Error(this.handleErrorMessage('Painless script not found.')),
this.errorSource
);
}
}
private async getPreviousReindexStatus(esClient: ElasticsearchClient): Promise<ReindexStatus> {
this.logDebug('Checking previous synchronization task status.');
if (!this.esReindexTaskId) {
return ReindexStatus.MISSING_TASK_ID;
}
const taskResponse = await esClient.tasks.get({ task_id: this.esReindexTaskId.toString() });
if (!taskResponse.completed) {
return ReindexStatus.RUNNING;
}
if (taskResponse.response?.failures?.length || taskResponse?.error) {
return ReindexStatus.FAILED;
}
return ReindexStatus.COMPLETED;
}
private buildSourceQuery(): QueryDslQueryContainer {
return SYNCHRONIZATION_QUERIES_DICTIONARY[this.destIndex](
new Date(this.lastSyncSuccess ? this.lastSyncSuccess : Date.now() - LOOKBACK_WINDOW)
);
}
private getSyncState(): SynchronizationTaskState {
return {
lastSyncSuccess: this.lastSyncSuccess,
lastSyncAttempt: this.lastSyncAttempt,
esReindexTaskId: this.esReindexTaskId,
};
}
private async getMapping(esClient: ElasticsearchClient): Promise<IndicesGetMappingResponse> {
this.logDebug('Getting index mapping.');
return esClient.indices.getMapping({
index: this.destIndex,
});
}
private async getPainlessScript(esClient: ElasticsearchClient) {
const painlessScriptId = await this.getPainlessScriptId(esClient);
this.logDebug(`Getting painless script with id: "${painlessScriptId}".`);
return esClient.getScript({
id: painlessScriptId,
});
}
private async getPainlessScriptId(esClient: ElasticsearchClient): Promise<string> {
const mapping = await this.getMapping(esClient);
const painlessScriptId = mapping[this.destIndex].mappings._meta?.painless_script_id;
if (!painlessScriptId) {
throw createTaskRunError(
new Error(this.handleErrorMessage('Painless script id missing from mapping.')),
this.errorSource
);
}
return painlessScriptId;
}
private async isIndexAvailable(esClient: ElasticsearchClient) {
this.logDebug('Checking index availability.');
return esClient.cluster.health({
index: this.destIndex,
wait_for_status: 'green',
timeout: '300ms', // this is probably too much
wait_for_active_shards: 'all',
});
}
public logDebug(message: string) {
this.logger.debug(`[${this.destIndex}] ${message}`, {
tags: ['cai-synchronization', this.destIndex],
});
}
public handleErrorMessage(message: string) {
const errorMessage = `[${this.destIndex}] Synchronization reindex failed. Error: ${message}`;
this.logger.error(errorMessage, {
tags: ['cai-synchronization', 'cai-synchronization-error', this.destIndex],
});
return errorMessage;
}
public async cancel() {}
}

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { errors as esErrors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { isRetryableEsClientError } from './utils';
describe('isRetryableEsClientError', () => {
describe('returns `false` for', () => {
test('non-retryable response errors', async () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
body: { error: { type: 'cluster_block_exception' } },
statusCode: 400,
})
);
expect(isRetryableEsClientError(error)).toEqual(false);
});
});
describe('returns `true` for', () => {
it('NoLivingConnectionsError', () => {
const error = new esErrors.NoLivingConnectionsError(
'reason',
elasticsearchClientMock.createApiResponse()
);
expect(isRetryableEsClientError(error)).toEqual(true);
});
it('ConnectionError', () => {
const error = new esErrors.ConnectionError(
'reason',
elasticsearchClientMock.createApiResponse()
);
expect(isRetryableEsClientError(error)).toEqual(true);
});
it('TimeoutError', () => {
const error = new esErrors.TimeoutError(
'reason',
elasticsearchClientMock.createApiResponse()
);
expect(isRetryableEsClientError(error)).toEqual(true);
});
it('ResponseError of type snapshot_in_progress_exception', () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
body: { error: { type: 'snapshot_in_progress_exception' } },
})
);
expect(isRetryableEsClientError(error)).toEqual(true);
});
it.each([503, 504, 401, 403, 408, 410, 429])(
'ResponseError with %p status code',
(statusCode) => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode,
body: { error: { type: 'reason' } },
})
);
expect(isRetryableEsClientError(error)).toEqual(true);
}
);
});
});

View file

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { errors as EsErrors } from '@elastic/elasticsearch';
const retryResponseStatuses = [
401, // AuthorizationException
403, // AuthenticationException
408, // RequestTimeout
410, // Gone
429, // TooManyRequests -> ES circuit breaker
503, // ServiceUnavailable
504, // GatewayTimeout
];
/**
* Returns true if the given elasticsearch error should be retried
* by retry-based resiliency systems such as the SO migration, false otherwise.
*/
export const isRetryableEsClientError = (e: EsErrors.ElasticsearchClientError): boolean => {
if (
e instanceof EsErrors.NoLivingConnectionsError ||
e instanceof EsErrors.ConnectionError ||
e instanceof EsErrors.TimeoutError ||
(e instanceof EsErrors.ResponseError &&
((e?.statusCode && retryResponseStatuses.includes(e?.statusCode)) ||
// ES returns a 400 Bad Request when trying to close or delete an
// index while snapshots are in progress. This should have been a 503
// so once https://github.com/elastic/elasticsearch/issues/65883 is
// fixed we can remove this.
e?.body?.error?.type === 'snapshot_in_progress_exception'))
) {
return true;
}
return false;
};

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { fullJitterBackoffFactory } from './full_jitter_backoff';
export { RetryService } from './retry_service';

View file

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { loggingSystemMock } from '@kbn/core-logging-browser-mocks';
import type { Logger } from '@kbn/core/server';
import { RetryService } from './retry_service';
import type { BackoffFactory } from './types';
class RetryServiceTestClass extends RetryService {
protected isRetryableError(error: Error) {
return true;
}
}
describe('RetryService', () => {
const nextBackOff = jest.fn();
const cb = jest.fn();
const backOffFactory: BackoffFactory = {
create: () => ({ nextBackOff }),
};
const mockLogger = loggingSystemMock.create().get() as jest.Mocked<Logger>;
let service: RetryService;
beforeEach(() => {
jest.clearAllMocks();
nextBackOff.mockReturnValue(1);
service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar');
});
it('should not retry after trying more than the max attempts', async () => {
const maxAttempts = 3;
service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar', maxAttempts);
cb.mockRejectedValue(new Error('My transient error'));
await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot(
`"My transient error"`
);
expect(cb).toBeCalledTimes(maxAttempts + 1);
expect(nextBackOff).toBeCalledTimes(maxAttempts);
});
it.each([409, 429, 503])(
'should retry and succeed retryable status code: %s',
async (statusCode) => {
const maxAttempts = 3;
service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar', maxAttempts);
const error = new Error('My transient error');
cb.mockRejectedValueOnce(error)
.mockRejectedValueOnce(error)
.mockResolvedValue({ status: 'ok' });
const res = await service.retryWithBackoff(cb);
expect(nextBackOff).toBeCalledTimes(maxAttempts - 1);
expect(cb).toBeCalledTimes(maxAttempts);
expect(res).toEqual({ status: 'ok' });
}
);
it('should succeed if cb does not throw', async () => {
service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar');
cb.mockResolvedValue({ status: 'ok' });
const res = await service.retryWithBackoff(cb);
expect(nextBackOff).toBeCalledTimes(0);
expect(cb).toBeCalledTimes(1);
expect(res).toEqual({ status: 'ok' });
});
describe('Logging', () => {
it('should log a warning when retrying', async () => {
service = new RetryServiceTestClass(mockLogger, backOffFactory, 'foobar', 2);
cb.mockRejectedValue(new Error('My transient error'));
await expect(() => service.retryWithBackoff(cb)).rejects.toThrowErrorMatchingInlineSnapshot(
`"My transient error"`
);
expect(mockLogger.warn).toBeCalledTimes(2);
expect(mockLogger.warn).toHaveBeenNthCalledWith(
1,
'[foobar][retryWithBackoff] Failed with error "My transient error". Attempt for retry: 1'
);
expect(mockLogger.warn).toHaveBeenNthCalledWith(
2,
'[foobar][retryWithBackoff] Failed with error "My transient error". Attempt for retry: 2'
);
});
});
});

View file

@ -6,48 +6,44 @@
*/
import type { Logger } from '@kbn/core/server';
import { CasesConnectorError } from './cases_connector_error';
import type { BackoffStrategy, BackoffFactory } from './types';
import type { BackoffFactory, BackoffStrategy } from './types';
export class CaseConnectorRetryService {
private logger: Logger;
export abstract class RetryService {
protected logger: Logger;
protected readonly serviceName: string;
private maxAttempts: number;
/**
* 409 - Conflict
* 429 - Too Many Requests
* 503 - ES Unavailable
*
* Full list of errors: src/core/packages/saved-objects/server/src/saved_objects_error_helpers.ts
*/
private readonly RETRY_ERROR_STATUS_CODES: number[] = [409, 429, 503];
private readonly backOffStrategy: BackoffStrategy;
private timer: NodeJS.Timeout | null = null;
private attempt: number = 0;
constructor(logger: Logger, backOffFactory: BackoffFactory, maxAttempts: number = 10) {
constructor(
logger: Logger,
backOffFactory: BackoffFactory,
serviceName: string,
maxAttempts: number = 10
) {
this.logger = logger;
this.backOffStrategy = backOffFactory.create();
this.maxAttempts = maxAttempts;
this.serviceName = serviceName;
}
public async retryWithBackoff<T>(cb: () => Promise<T>): Promise<T> {
try {
this.logger.debug(
`[CasesConnector][retryWithBackoff] Running case connector. Attempt: ${this.attempt}`,
`[${this.serviceName}][retryWithBackoff] Running. Attempt: ${this.attempt}`,
{
labels: { attempt: this.attempt },
tags: ['case-connector:retry-start'],
}
);
const res = await cb();
this.logger.debug(
`[CasesConnector][retryWithBackoff] Case connector run successfully after ${this.attempt} attempts`,
`[${this.serviceName}][retryWithBackoff] Run successfully after ${this.attempt} attempts.`,
{
labels: { attempt: this.attempt },
tags: ['case-connector:retry-success'],
}
);
@ -59,9 +55,15 @@ export class CaseConnectorRetryService {
await this.delay();
this.logger.warn(
`[CaseConnector] Case connector failed with status code ${error.statusCode}. Attempt for retry: ${this.attempt}`
);
if (error.statusCode) {
this.logger.warn(
`[${this.serviceName}][retryWithBackoff] Failed with status code ${error.statusCode}. Attempt for retry: ${this.attempt}`
);
} else {
this.logger.warn(
`[${this.serviceName}][retryWithBackoff] Failed with error "${error.message}". Attempt for retry: ${this.attempt}`
);
}
return this.retryWithBackoff(cb);
}
@ -69,10 +71,9 @@ export class CaseConnectorRetryService {
throw error;
} finally {
this.logger.debug(
`[CasesConnector][retryWithBackoff] Case connector run ended after ${this.attempt} attempts`,
`[${this.serviceName}][retryWithBackoff] Run ended after ${this.attempt} attempts.`,
{
labels: { attempt: this.attempt },
tags: ['case-connector:retry-end'],
}
);
}
@ -82,20 +83,7 @@ export class CaseConnectorRetryService {
return this.attempt < this.maxAttempts;
}
private isRetryableError(error: Error) {
if (
error instanceof CasesConnectorError &&
this.RETRY_ERROR_STATUS_CODES.includes(error.statusCode)
) {
return true;
}
this.logger.debug(`[CasesConnector][isRetryableError] Error is not retryable`, {
tags: ['case-connector:retry-error'],
});
return false;
}
protected abstract isRetryableError(error: Error): boolean;
private async delay() {
const ms = this.backOffStrategy.nextBackOff();

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export interface BackoffStrategy {
nextBackOff: () => number;
}
export interface BackoffFactory {
create: () => BackoffStrategy;
}

View file

@ -12,6 +12,7 @@ describe('config validation', () => {
it('sets the defaults correctly', () => {
expect(ConfigSchema.validate({})).toMatchInlineSnapshot(`
Object {
"analytics": Object {},
"files": Object {
"allowedMimeTypes": Array [
"image/aces",

View file

@ -23,6 +23,13 @@ export const ConfigSchema = schema.object({
stack: schema.object({
enabled: schema.boolean({ defaultValue: true }),
}),
analytics: schema.object({
index: schema.maybe(
schema.object({
enabled: schema.boolean({ defaultValue: true }),
})
),
}),
});
export type ConfigType = TypeOf<typeof ConfigSchema>;

View file

@ -17,10 +17,10 @@ import { CasesOracleService } from './cases_oracle_service';
import { CasesService } from './cases_service';
import { CasesConnectorError } from './cases_connector_error';
import { CaseError } from '../../common/error';
import { fullJitterBackoffFactory } from './full_jitter_backoff';
import { fullJitterBackoffFactory } from '../../common/retry_service/full_jitter_backoff';
jest.mock('./cases_connector_executor');
jest.mock('./full_jitter_backoff');
jest.mock('../../common/retry_service/full_jitter_backoff');
const CasesConnectorExecutorMock = CasesConnectorExecutor as jest.Mock;
const fullJitterBackoffFactoryMock = fullJitterBackoffFactory as jest.Mock;

View file

@ -22,8 +22,8 @@ import {
isCasesConnectorError,
} from './cases_connector_error';
import { CasesConnectorExecutor } from './cases_connector_executor';
import { CaseConnectorRetryService } from './retry_service';
import { fullJitterBackoffFactory } from './full_jitter_backoff';
import { CasesConnectorRetryService } from './cases_connector_retry_service';
import { fullJitterBackoffFactory } from '../../common/retry_service';
import { CASE_RULES_SAVED_OBJECT, CASES_CONNECTOR_SUB_ACTION } from '../../../common/constants';
interface CasesConnectorParams {
@ -43,7 +43,7 @@ export class CasesConnector extends SubActionConnector<
CasesConnectorSecrets
> {
private readonly casesService: CasesService;
private readonly retryService: CaseConnectorRetryService;
private readonly retryService: CasesConnectorRetryService;
private readonly casesParams: CasesConnectorParams['casesParams'];
constructor({ connectorParams, casesParams }: CasesConnectorParams) {
@ -55,7 +55,7 @@ export class CasesConnector extends SubActionConnector<
* We should wait at least 5ms before retrying and no more that 2sec
*/
const backOffFactory = fullJitterBackoffFactory({ baseDelay: 5, maxBackoffTime: 2000 });
this.retryService = new CaseConnectorRetryService(this.logger, backOffFactory);
this.retryService = new CasesConnectorRetryService(this.logger, backOffFactory);
this.casesParams = casesParams;

View file

@ -5,13 +5,14 @@
* 2.0.
*/
import { loggingSystemMock } from '@kbn/core-logging-browser-mocks';
import type { Logger } from '@kbn/core/server';
import { CasesConnectorError } from './cases_connector_error';
import { CaseConnectorRetryService } from './retry_service';
import type { BackoffFactory } from './types';
import { loggingSystemMock } from '@kbn/core-logging-browser-mocks';
describe('CryptoService', () => {
import type { BackoffFactory } from '../../common/retry_service/types';
import { CasesConnectorError } from './cases_connector_error';
import { CasesConnectorRetryService } from './cases_connector_retry_service';
describe('CasesConnectorRetryService', () => {
const nextBackOff = jest.fn();
const cb = jest.fn();
@ -21,13 +22,13 @@ describe('CryptoService', () => {
const mockLogger = loggingSystemMock.create().get() as jest.Mocked<Logger>;
let service: CaseConnectorRetryService;
let service: CasesConnectorRetryService;
beforeEach(() => {
jest.clearAllMocks();
nextBackOff.mockReturnValue(1);
service = new CaseConnectorRetryService(mockLogger, backOffFactory);
service = new CasesConnectorRetryService(mockLogger, backOffFactory);
});
it('should not retry if the error is not CasesConnectorError', async () => {
@ -54,7 +55,7 @@ describe('CryptoService', () => {
it('should not retry after trying more than the max attempts', async () => {
const maxAttempts = 3;
service = new CaseConnectorRetryService(mockLogger, backOffFactory, maxAttempts);
service = new CasesConnectorRetryService(mockLogger, backOffFactory, maxAttempts);
cb.mockRejectedValue(new CasesConnectorError('My transient error', 409));
@ -70,7 +71,7 @@ describe('CryptoService', () => {
'should retry and succeed retryable status code: %s',
async (statusCode) => {
const maxAttempts = 3;
service = new CaseConnectorRetryService(mockLogger, backOffFactory, maxAttempts);
service = new CasesConnectorRetryService(mockLogger, backOffFactory, maxAttempts);
const error = new CasesConnectorError('My transient error', statusCode);
cb.mockRejectedValueOnce(error)
@ -86,7 +87,7 @@ describe('CryptoService', () => {
);
it('should succeed if cb does not throw', async () => {
service = new CaseConnectorRetryService(mockLogger, backOffFactory);
service = new CasesConnectorRetryService(mockLogger, backOffFactory);
cb.mockResolvedValue({ status: 'ok' });
@ -99,7 +100,7 @@ describe('CryptoService', () => {
describe('Logging', () => {
it('should log a warning when retrying', async () => {
service = new CaseConnectorRetryService(mockLogger, backOffFactory, 2);
service = new CasesConnectorRetryService(mockLogger, backOffFactory, 2);
cb.mockRejectedValue(new CasesConnectorError('My transient error', 409));
@ -110,12 +111,12 @@ describe('CryptoService', () => {
expect(mockLogger.warn).toBeCalledTimes(2);
expect(mockLogger.warn).toHaveBeenNthCalledWith(
1,
'[CaseConnector] Case connector failed with status code 409. Attempt for retry: 1'
'[CasesConnector][retryWithBackoff] Failed with status code 409. Attempt for retry: 1'
);
expect(mockLogger.warn).toHaveBeenNthCalledWith(
2,
'[CaseConnector] Case connector failed with status code 409. Attempt for retry: 2'
'[CasesConnector][retryWithBackoff] Failed with status code 409. Attempt for retry: 2'
);
});

View file

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Logger } from '@kbn/core/server';
import type { BackoffFactory } from '../../common/retry_service/types';
import { CasesConnectorError } from './cases_connector_error';
import { RetryService } from '../../common/retry_service';
export class CasesConnectorRetryService extends RetryService {
/**
* 409 - Conflict
* 429 - Too Many Requests
* 503 - ES Unavailable
*
* Full list of errors: src/core/packages/saved-objects/server/src/saved_objects_error_helpers.ts
*/
private readonly RETRY_ERROR_STATUS_CODES: number[] = [409, 429, 503];
constructor(logger: Logger, backOffFactory: BackoffFactory, maxAttempts: number = 10) {
super(logger, backOffFactory, 'CasesConnector', maxAttempts);
}
protected isRetryableError(error: Error) {
if (
error instanceof CasesConnectorError &&
this.RETRY_ERROR_STATUS_CODES.includes(error.statusCode)
) {
return true;
}
this.logger.debug(`[CasesConnector][isRetryableError] Error is not retryable`, {
tags: ['case-connector:retry-error'],
});
return false;
}
}

View file

@ -79,13 +79,5 @@ export type BulkUpdateOracleRecordRequest = Array<{
payload: Pick<OracleRecordAttributes, 'counter'>;
}>;
export interface BackoffStrategy {
nextBackOff: () => number;
}
export interface BackoffFactory {
create: () => BackoffStrategy;
}
export type CasesConnectorRuleActionParams = TypeOf<typeof CasesConnectorRuleActionParamsSchema>;
export type CasesConnectorParams = TypeOf<typeof CasesConnectorParamsSchema>;

View file

@ -28,6 +28,7 @@ function getConfig(overrides = {}) {
markdownPlugins: { lens: true },
files: { maxSize: 1, allowedMimeTypes: ALLOWED_MIME_TYPES },
stack: { enabled: true },
analytics: {},
...overrides,
};
}
@ -74,6 +75,7 @@ describe('Cases Plugin', () => {
security: securityMock.createStart(),
notifications: notificationsMock.createStart(),
ruleRegistry: { getRacClientWithRequest: jest.fn(), alerting: alertsMock.createStart() },
taskManager: taskManagerMock.createStart(),
};
});

View file

@ -47,6 +47,11 @@ import { registerCaseFileKinds } from './files';
import type { ConfigType } from './config';
import { registerConnectorTypes } from './connectors';
import { registerSavedObjects } from './saved_object_types';
import {
createCasesAnalyticsIndexes,
registerCasesAnalyticsIndexesTasks,
scheduleCasesAnalyticsSyncTasks,
} from './cases_analytics';
export class CasePlugin
implements
@ -66,6 +71,7 @@ export class CasePlugin
private persistableStateAttachmentTypeRegistry: PersistableStateAttachmentTypeRegistry;
private externalReferenceAttachmentTypeRegistry: ExternalReferenceAttachmentTypeRegistry;
private userProfileService: UserProfileService;
private readonly isServerless: boolean;
constructor(private readonly initializerContext: PluginInitializerContext) {
this.caseConfig = initializerContext.config.get<ConfigType>();
@ -75,9 +81,13 @@ export class CasePlugin
this.persistableStateAttachmentTypeRegistry = new PersistableStateAttachmentTypeRegistry();
this.externalReferenceAttachmentTypeRegistry = new ExternalReferenceAttachmentTypeRegistry();
this.userProfileService = new UserProfileService(this.logger);
this.isServerless = initializerContext.env.packageInfo.buildFlavor === 'serverless';
}
public setup(core: CoreSetup, plugins: CasesServerSetupDependencies): CasesServerSetup {
public setup(
core: CoreSetup<CasesServerStartDependencies>,
plugins: CasesServerSetupDependencies
): CasesServerSetup {
this.logger.debug(
`Setting up Case Workflow with core contract [${Object.keys(
core
@ -90,6 +100,11 @@ export class CasePlugin
);
registerCaseFileKinds(this.caseConfig.files, plugins.files, core.security.fips.isEnabled());
registerCasesAnalyticsIndexesTasks({
taskManager: plugins.taskManager,
logger: this.logger,
core,
});
this.securityPluginSetup = plugins.security;
this.lensEmbeddableFactory = plugins.lens.lensEmbeddableFactory;
@ -188,6 +203,15 @@ export class CasePlugin
if (plugins.taskManager) {
scheduleCasesTelemetryTask(plugins.taskManager, this.logger);
if (this.caseConfig.analytics.index?.enabled) {
scheduleCasesAnalyticsSyncTasks({ taskManager: plugins.taskManager, logger: this.logger });
createCasesAnalyticsIndexes({
esClient: core.elasticsearch.client.asInternalUser,
logger: this.logger,
isServerless: this.isServerless,
taskManager: plugins.taskManager,
}).catch(() => {}); // it shouldn't reject, but just in case
}
}
this.userProfileService.initialize({

View file

@ -44,7 +44,7 @@ export interface CasesServerSetupDependencies {
files: FilesSetup;
security: SecurityPluginSetup;
licensing: LicensingPluginSetup;
taskManager?: TaskManagerSetupContract;
taskManager: TaskManagerSetupContract;
usageCollection?: UsageCollectionSetup;
spaces?: SpacesPluginSetup;
cloud?: CloudSetup;
@ -55,7 +55,7 @@ export interface CasesServerStartDependencies {
features: FeaturesPluginStart;
files: FilesStart;
licensing: LicensingPluginStart;
taskManager?: TaskManagerStartContract;
taskManager: TaskManagerStartContract;
security: SecurityPluginStart;
spaces?: SpacesPluginStart;
notifications: NotificationsPluginStart;

View file

@ -82,6 +82,8 @@
"@kbn/code-editor-mock",
"@kbn/monaco",
"@kbn/code-editor",
"@kbn/logging",
"@kbn/core-elasticsearch-client-server-mocks",
"@kbn/core-test-helpers-model-versions",
],
"exclude": [

View file

@ -29,7 +29,7 @@ const createStartMock = () => {
bulkRemove: jest.fn(),
schedule: jest.fn(),
runSoon: jest.fn(),
ensureScheduled: jest.fn(),
ensureScheduled: jest.fn().mockResolvedValue(Promise.resolve()), // it's a promise and there are some places where it's followed by `.catch()`
removeIfExists: jest.fn().mockResolvedValue(Promise.resolve()), // it's a promise and there are some places where it's followed by `.catch()`
bulkUpdateSchedules: jest.fn(),
bulkSchedule: jest.fn(),

View file

@ -143,6 +143,8 @@ export default function ({ getService }: FtrProviderContext) {
'alerts_invalidate_api_keys',
'apm-source-map-migration-task',
'apm-telemetry-task',
'cai:cases_analytics_index_backfill',
'cai:cases_analytics_index_synchronization',
'cases-telemetry-task',
'cloud_security_posture-stats_task',
'dashboard_telemetry',