[Rule Registry] Initialization errors should surface as a framework execution error. (#134280)

* Bubbling up initialization errors

* Rule data client writer throws error when initialization fails or writing is disabled.

* PR feedback

* Updating error message with registration context

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2022-06-24 11:40:18 -04:00 committed by GitHub
parent a7320122b2
commit 300f7dd84b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 316 additions and 192 deletions

View file

@ -99,7 +99,8 @@ const ruleDataClient = new RuleDataClient({
// to start writing data, call `getWriter().bulk()`. It supports a `namespace`
// property as well, that for instance can be used to write data to a space-specific
// index.
await ruleDataClient.getWriter().bulk({
const writer = await ruleDataClient.getWriter();
await writer.bulk({
body: eventsToIndex.flatMap((event) => [{ index: {} }, event]),
});

View file

@ -15,7 +15,9 @@ type MockInstances<T extends Record<string, any>> = {
type RuleDataClientMock = jest.Mocked<Omit<IRuleDataClient, 'getWriter' | 'getReader'>> & {
getReader: (...args: Parameters<IRuleDataClient['getReader']>) => MockInstances<IRuleDataReader>;
getWriter: (...args: Parameters<IRuleDataClient['getWriter']>) => MockInstances<IRuleDataWriter>;
getWriter: (
...args: Parameters<IRuleDataClient['getWriter']>
) => Promise<MockInstances<IRuleDataWriter>>;
};
export const createRuleDataClientMock = (
@ -37,8 +39,10 @@ export const createRuleDataClientMock = (
getDynamicIndexPattern,
})),
getWriter: jest.fn(() => ({
bulk,
})),
getWriter: jest.fn(() =>
Promise.resolve({
bulk,
})
),
};
};

View file

@ -13,13 +13,14 @@ import { resourceInstallerMock } from '../rule_data_plugin_service/resource_inst
import { loggingSystemMock, elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { IndexPatternsFetcher } from '@kbn/data-plugin/server';
import { createNoMatchingIndicesError } from '@kbn/data-views-plugin/server/fetcher/lib/errors';
import { ElasticsearchClient } from '@kbn/core/server';
const mockLogger = loggingSystemMock.create().get();
const scopedClusterClient = elasticsearchServiceMock.createScopedClusterClient().asInternalUser;
const mockResourceInstaller = resourceInstallerMock.create();
// Be careful setting this delay too high. Jest tests can time out
const delay = (ms: number = 3000) => new Promise((resolve) => setTimeout(resolve, ms));
const delay = (ms: number = 1500) => new Promise((resolve) => setTimeout(resolve, ms));
interface GetRuleDataClientOptionsOpts {
isWriteEnabled?: boolean;
@ -184,132 +185,176 @@ describe('RuleDataClient', () => {
jest.clearAllMocks();
});
describe('bulk()', () => {
test('logs debug and returns undefined if writing is disabled', async () => {
const ruleDataClient = new RuleDataClient(
getRuleDataClientOptions({ isWriteEnabled: false })
);
const writer = ruleDataClient.getWriter();
test('throws error if writing is disabled', async () => {
const ruleDataClient = new RuleDataClient(
getRuleDataClientOptions({ isWriteEnabled: false })
);
// Previously, a delay between calling getWriter() and using a writer function
// would cause an Unhandled promise rejection if there were any errors getting a writer
// Adding this delay in the tests to ensure this does not pop up again.
await delay();
await expect(() => ruleDataClient.getWriter()).rejects.toThrowErrorMatchingInlineSnapshot(
`"Rule registry writing is disabled. Make sure that \\"xpack.ruleRegistry.write.enabled\\" configuration is not set to false and \\"observability.apm\\" is not disabled in \\"xpack.ruleRegistry.write.disabledRegistrationContexts\\" within \\"kibana.yml\\"."`
);
expect(mockLogger.debug).toHaveBeenCalledWith(
`Writing is disabled, bulk() will not write any data.`
);
});
expect(await writer.bulk({})).toEqual(undefined);
expect(mockLogger.debug).toHaveBeenCalledWith(
`Writing is disabled, bulk() will not write any data.`
);
test('throws error if initialization of writer fails due to index error', async () => {
const ruleDataClient = new RuleDataClient(
getRuleDataClientOptions({
waitUntilReadyForWriting: Promise.resolve(
left(new Error('could not get cluster client'))
),
})
);
expect(ruleDataClient.isWriteEnabled()).toBe(true);
await expect(() => ruleDataClient.getWriter()).rejects.toThrowErrorMatchingInlineSnapshot(
`"There has been a catastrophic error trying to install index level resources for the following registration context: observability.apm. This may have been due to a non-additive change to the mappings, removal and type changes are not permitted. Full error: Error: could not get cluster client"`
);
expect(mockLogger.error).toHaveBeenNthCalledWith(
1,
new RuleDataWriterInitializationError(
'index',
'observability.apm',
new Error('could not get cluster client')
)
);
expect(mockLogger.error).toHaveBeenNthCalledWith(
2,
`The writer for the Rule Data Client for the observability.apm registration context was not initialized properly, bulk() cannot continue, and writing will be disabled.`
);
expect(ruleDataClient.isWriteEnabled()).toBe(false);
// getting the writer again at this point should throw another error
await expect(() => ruleDataClient.getWriter()).rejects.toThrowErrorMatchingInlineSnapshot(
`"Rule registry writing is disabled due to an error during Rule Data Client initialization."`
);
expect(mockLogger.debug).toHaveBeenCalledWith(
`Writing is disabled, bulk() will not write any data.`
);
});
test('throws error if initialization of writer fails due to namespace error', async () => {
mockResourceInstaller.installAndUpdateNamespaceLevelResources.mockRejectedValueOnce(
new Error('bad resource installation')
);
const ruleDataClient = new RuleDataClient(getRuleDataClientOptions({}));
expect(ruleDataClient.isWriteEnabled()).toBe(true);
await expect(() => ruleDataClient.getWriter()).rejects.toThrowErrorMatchingInlineSnapshot(
`"There has been a catastrophic error trying to install namespace level resources for the following registration context: observability.apm. This may have been due to a non-additive change to the mappings, removal and type changes are not permitted. Full error: Error: bad resource installation"`
);
expect(mockLogger.error).toHaveBeenNthCalledWith(
1,
new RuleDataWriterInitializationError(
'namespace',
'observability.apm',
new Error('bad resource installation')
)
);
expect(mockLogger.error).toHaveBeenNthCalledWith(
2,
`The writer for the Rule Data Client for the observability.apm registration context was not initialized properly, bulk() cannot continue, and writing will be disabled.`
);
expect(ruleDataClient.isWriteEnabled()).toBe(false);
// getting the writer again at this point should throw another error
await expect(() => ruleDataClient.getWriter()).rejects.toThrowErrorMatchingInlineSnapshot(
`"Rule registry writing is disabled due to an error during Rule Data Client initialization."`
);
expect(mockLogger.debug).toHaveBeenCalledWith(
`Writing is disabled, bulk() will not write any data.`
);
});
test('uses cached cluster client when repeatedly initializing writer', async () => {
const ruleDataClient = new RuleDataClient(getRuleDataClientOptions({}));
await ruleDataClient.getWriter();
await ruleDataClient.getWriter();
await ruleDataClient.getWriter();
await ruleDataClient.getWriter();
await ruleDataClient.getWriter();
expect(mockResourceInstaller.installAndUpdateNamespaceLevelResources).toHaveBeenCalledTimes(
1
);
});
});
describe('bulk()', () => {
test('logs debug and returns undefined if clusterClient is not defined', async () => {
const ruleDataClient = new RuleDataClient(
getRuleDataClientOptions({
waitUntilReadyForWriting: new Promise((resolve) =>
resolve(right(undefined as unknown as ElasticsearchClient))
),
})
);
const writer = await ruleDataClient.getWriter();
// Previously, a delay between calling getWriter() and using a writer function
// would cause an Unhandled promise rejection if there were any errors getting a writer
// Adding this delay in the tests to ensure this does not pop up again.
await delay();
expect(await writer.bulk({})).toEqual(undefined);
expect(mockLogger.debug).toHaveBeenCalledWith(
`Writing is disabled, bulk() will not write any data.`
);
});
test('throws and logs error if bulk function throws error', async () => {
const error = new Error('something went wrong!');
scopedClusterClient.bulk.mockRejectedValueOnce(error);
const ruleDataClient = new RuleDataClient(getRuleDataClientOptions({}));
expect(ruleDataClient.isWriteEnabled()).toBe(true);
const writer = await ruleDataClient.getWriter();
// Previously, a delay between calling getWriter() and using a writer function
// would cause an Unhandled promise rejection if there were any errors getting a writer
// Adding this delay in the tests to ensure this does not pop up again.
await delay();
await expect(() => writer.bulk({})).rejects.toThrowErrorMatchingInlineSnapshot(
`"something went wrong!"`
);
expect(mockLogger.error).toHaveBeenNthCalledWith(1, error);
expect(ruleDataClient.isWriteEnabled()).toBe(true);
});
test('waits until cluster client is ready before calling bulk', async () => {
const ruleDataClient = new RuleDataClient(
getRuleDataClientOptions({
waitUntilReadyForWriting: new Promise((resolve) =>
setTimeout(resolve, 3000, right(scopedClusterClient))
),
})
);
const writer = await ruleDataClient.getWriter();
// Previously, a delay between calling getWriter() and using a writer function
// would cause an Unhandled promise rejection if there were any errors getting a writer
// Adding this delay in the tests to ensure this does not pop up again.
await delay();
const response = await writer.bulk({});
expect(response).toEqual({
body: {},
headers: {
'x-elastic-product': 'Elasticsearch',
},
meta: {},
statusCode: 200,
warnings: [],
});
test('logs error, returns undefined and turns off writing if initialization error', async () => {
const ruleDataClient = new RuleDataClient(
getRuleDataClientOptions({
waitUntilReadyForWriting: Promise.resolve(
left(new Error('could not get cluster client'))
),
})
);
expect(ruleDataClient.isWriteEnabled()).toBe(true);
const writer = ruleDataClient.getWriter();
// Previously, a delay between calling getWriter() and using a writer function
// would cause an Unhandled promise rejection if there were any errors getting a writer
// Adding this delay in the tests to ensure this does not pop up again.
await delay();
expect(await writer.bulk({})).toEqual(undefined);
expect(mockLogger.error).toHaveBeenNthCalledWith(
1,
new RuleDataWriterInitializationError(
'index',
'observability.apm',
new Error('could not get cluster client')
)
);
expect(mockLogger.error).toHaveBeenNthCalledWith(
2,
`The writer for the Rule Data Client for the observability.apm registration context was not initialized properly, bulk() cannot continue, and writing will be disabled.`
);
expect(ruleDataClient.isWriteEnabled()).toBe(false);
});
test('logs error, returns undefined and turns off writing if resource installation error', async () => {
const error = new Error('bad resource installation');
mockResourceInstaller.installAndUpdateNamespaceLevelResources.mockRejectedValueOnce(error);
const ruleDataClient = new RuleDataClient(getRuleDataClientOptions({}));
expect(ruleDataClient.isWriteEnabled()).toBe(true);
const writer = ruleDataClient.getWriter();
// Previously, a delay between calling getWriter() and using a writer function
// would cause an Unhandled promise rejection if there were any errors getting a writer
// Adding this delay in the tests to ensure this does not pop up again.
await delay();
expect(await writer.bulk({})).toEqual(undefined);
expect(mockLogger.error).toHaveBeenNthCalledWith(
1,
new RuleDataWriterInitializationError('namespace', 'observability.apm', error)
);
expect(mockLogger.error).toHaveBeenNthCalledWith(
2,
`The writer for the Rule Data Client for the observability.apm registration context was not initialized properly, bulk() cannot continue, and writing will be disabled.`
);
expect(ruleDataClient.isWriteEnabled()).toBe(false);
});
test('logs error and returns undefined if bulk function throws error', async () => {
const error = new Error('something went wrong!');
scopedClusterClient.bulk.mockRejectedValueOnce(error);
const ruleDataClient = new RuleDataClient(getRuleDataClientOptions({}));
expect(ruleDataClient.isWriteEnabled()).toBe(true);
const writer = ruleDataClient.getWriter();
// Previously, a delay between calling getWriter() and using a writer function
// would cause an Unhandled promise rejection if there were any errors getting a writer
// Adding this delay in the tests to ensure this does not pop up again.
await delay();
expect(await writer.bulk({})).toEqual(undefined);
expect(mockLogger.error).toHaveBeenNthCalledWith(1, error);
expect(ruleDataClient.isWriteEnabled()).toBe(true);
});
test('waits until cluster client is ready before calling bulk', async () => {
const ruleDataClient = new RuleDataClient(
getRuleDataClientOptions({
waitUntilReadyForWriting: new Promise((resolve) =>
setTimeout(resolve, 3000, right(scopedClusterClient))
),
})
);
const writer = ruleDataClient.getWriter();
// Previously, a delay between calling getWriter() and using a writer function
// would cause an Unhandled promise rejection if there were any errors getting a writer
// Adding this delay in the tests to ensure this does not pop up again.
await delay();
const response = await writer.bulk({});
expect(response).toEqual({
body: {},
headers: {
'x-elastic-product': 'Elasticsearch',
},
meta: {},
statusCode: 200,
warnings: [],
});
expect(scopedClusterClient.bulk).toHaveBeenCalledWith(
{
index: `.alerts-observability.apm.alerts-default`,
require_alias: true,
},
{ meta: true }
);
});
expect(scopedClusterClient.bulk).toHaveBeenCalledWith(
{
index: `.alerts-observability.apm.alerts-default`,
require_alias: true,
},
{ meta: true }
);
});
});
});

View file

@ -35,11 +35,14 @@ export type WaitResult = Either<Error, ElasticsearchClient>;
export class RuleDataClient implements IRuleDataClient {
private _isWriteEnabled: boolean = false;
private _isWriteInitializationFailed: boolean = false;
private _isWriterCacheEnabled: boolean = true;
// Writers cached by namespace
private writerCache: Map<string, IRuleDataWriter>;
private clusterClient: ElasticsearchClient | null = null;
constructor(private readonly options: RuleDataClientConstructorOptions) {
this.writeEnabled = this.options.isWriteEnabled;
this.writerCacheEnabled = this.options.isWriterCacheEnabled;
@ -66,6 +69,14 @@ export class RuleDataClient implements IRuleDataClient {
this._isWriteEnabled = isEnabled;
}
private get writeInitializationFailed(): boolean {
return this._isWriteInitializationFailed;
}
private set writeInitializationFailed(isFailed: boolean) {
this._isWriteInitializationFailed = isFailed;
}
public isWriteEnabled(): boolean {
return this.writeEnabled;
}
@ -131,14 +142,14 @@ export class RuleDataClient implements IRuleDataClient {
};
}
public getWriter(options: { namespace?: string } = {}): IRuleDataWriter {
public async getWriter(options: { namespace?: string } = {}): Promise<IRuleDataWriter> {
const namespace = options.namespace || 'default';
const cachedWriter = this.writerCache.get(namespace);
const isWriterCacheEnabled = () => this.writerCacheEnabled;
// There is no cached writer, so we'll install / update the namespace specific resources now.
if (!isWriterCacheEnabled() || !cachedWriter) {
const writerForNamespace = this.initializeWriter(namespace);
const writerForNamespace = await this.initializeWriter(namespace);
this.writerCache.set(namespace, writerForNamespace);
return writerForNamespace;
} else {
@ -146,19 +157,27 @@ export class RuleDataClient implements IRuleDataClient {
}
}
private initializeWriter(namespace: string): IRuleDataWriter {
private async initializeWriter(namespace: string): Promise<IRuleDataWriter> {
const isWriteEnabled = () => this.writeEnabled;
const turnOffWrite = () => (this.writeEnabled = false);
const isWriteInitializationError = () => this.writeInitializationFailed;
const turnOffWriteDueToInitializationError = () => {
this.writeEnabled = false;
this.writeInitializationFailed = true;
};
const { indexInfo, resourceInstaller } = this.options;
const alias = indexInfo.getPrimaryAlias(namespace);
// Wait until both index and namespace level resources have been installed / updated.
const prepareForWriting = async () => {
if (!isWriteEnabled()) {
throw new RuleDataWriteDisabledError();
}
if (!isWriteEnabled()) {
this.options.logger.debug(`Writing is disabled, bulk() will not write any data.`);
throw new RuleDataWriteDisabledError({
reason: isWriteInitializationError() ? 'error' : 'config',
registrationContext: indexInfo.indexOptions.registrationContext,
});
}
try {
const indexLevelResourcesResult = await this.options.waitUntilReadyForWriting;
if (isLeft(indexLevelResourcesResult)) {
@ -170,7 +189,7 @@ export class RuleDataClient implements IRuleDataClient {
} else {
try {
await resourceInstaller.installAndUpdateNamespaceLevelResources(indexInfo, namespace);
return indexLevelResourcesResult.right;
this.clusterClient = indexLevelResourcesResult.right;
} catch (e) {
throw new RuleDataWriterInitializationError(
'namespace',
@ -179,33 +198,31 @@ export class RuleDataClient implements IRuleDataClient {
);
}
}
};
const prepareForWritingResult = prepareForWriting().catch((error) => {
} catch (error) {
if (error instanceof RuleDataWriterInitializationError) {
this.options.logger.error(error);
this.options.logger.error(
`The writer for the Rule Data Client for the ${indexInfo.indexOptions.registrationContext} registration context was not initialized properly, bulk() cannot continue, and writing will be disabled.`
);
turnOffWrite();
} else if (error instanceof RuleDataWriteDisabledError) {
this.options.logger.debug(`Writing is disabled, bulk() will not write any data.`);
turnOffWriteDueToInitializationError();
}
return undefined;
});
throw error;
}
return {
bulk: async (request: estypes.BulkRequest) => {
try {
const clusterClient = await prepareForWritingResult;
if (clusterClient) {
if (this.clusterClient) {
const requestWithDefaultParameters = {
...request,
require_alias: true,
index: alias,
};
const response = await clusterClient.bulk(requestWithDefaultParameters, { meta: true });
const response = await this.clusterClient.bulk(requestWithDefaultParameters, {
meta: true,
});
if (response.body.errors) {
const error = new errors.ResponseError(response);
@ -215,11 +232,9 @@ export class RuleDataClient implements IRuleDataClient {
} else {
this.options.logger.debug(`Writing is disabled, bulk() will not write any data.`);
}
return undefined;
} catch (error) {
this.options.logger.error(error);
return undefined;
throw error;
}
},
};

View file

@ -19,7 +19,7 @@ export interface IRuleDataClient {
kibanaVersion: string;
isWriteEnabled(): boolean;
getReader(options?: { namespace?: string }): IRuleDataReader;
getWriter(options?: { namespace?: string }): IRuleDataWriter;
getWriter(options?: { namespace?: string }): Promise<IRuleDataWriter>;
}
export interface IRuleDataReader {

View file

@ -7,8 +7,28 @@
/* eslint-disable max-classes-per-file */
export class RuleDataWriteDisabledError extends Error {
constructor(message?: string) {
super(message);
constructor({
reason,
registrationContext,
message,
}: {
reason: 'config' | 'error';
registrationContext?: string;
message?: string;
}) {
let errMessage = message;
if (!errMessage) {
if (reason === 'config') {
if (registrationContext) {
errMessage = `Rule registry writing is disabled. Make sure that "xpack.ruleRegistry.write.enabled" configuration is not set to false and "${registrationContext}" is not disabled in "xpack.ruleRegistry.write.disabledRegistrationContexts" within "kibana.yml".`;
} else {
errMessage = `Rule registry writing is disabled. Make sure that "xpack.ruleRegistry.write.enabled" configuration is not set to false within "kibana.yml".`;
}
} else if (reason === 'error') {
errMessage = `Rule registry writing is disabled due to an error during Rule Data Client initialization.`;
}
}
super(errMessage);
Object.setPrototypeOf(this, new.target.prototype);
this.name = 'RuleDataWriteDisabledError';
}
@ -20,8 +40,9 @@ export class RuleDataWriterInitializationError extends Error {
registrationContext: string,
error: string | Error
) {
super(`There has been a catastrophic error trying to install ${resourceType} level resources for the following registration context: ${registrationContext}.
This may have been due to a non-additive change to the mappings, removal and type changes are not permitted. Full error: ${error.toString()}`);
super(
`There has been a catastrophic error trying to install ${resourceType} level resources for the following registration context: ${registrationContext}. This may have been due to a non-additive change to the mappings, removal and type changes are not permitted. Full error: ${error.toString()}`
);
Object.setPrototypeOf(this, new.target.prototype);
this.name = 'RuleDataWriterInitializationError';
}

View file

@ -86,7 +86,7 @@ describe('createLifecycleExecutor', () => {
})
);
expect(ruleDataClientMock.getWriter().bulk).toHaveBeenCalledWith(
expect((await ruleDataClientMock.getWriter()).bulk).toHaveBeenCalledWith(
expect.objectContaining({
body: [
// alert documents
@ -107,7 +107,7 @@ describe('createLifecycleExecutor', () => {
],
})
);
expect(ruleDataClientMock.getWriter().bulk).not.toHaveBeenCalledWith(
expect((await ruleDataClientMock.getWriter()).bulk).not.toHaveBeenCalledWith(
expect.objectContaining({
body: expect.arrayContaining([
// evaluation documents
@ -201,7 +201,7 @@ describe('createLifecycleExecutor', () => {
})
);
expect(ruleDataClientMock.getWriter().bulk).toHaveBeenCalledWith(
expect((await ruleDataClientMock.getWriter()).bulk).toHaveBeenCalledWith(
expect.objectContaining({
body: [
// alert document
@ -227,7 +227,7 @@ describe('createLifecycleExecutor', () => {
],
})
);
expect(ruleDataClientMock.getWriter().bulk).not.toHaveBeenCalledWith(
expect((await ruleDataClientMock.getWriter()).bulk).not.toHaveBeenCalledWith(
expect.objectContaining({
body: expect.arrayContaining([
// evaluation documents
@ -316,7 +316,7 @@ describe('createLifecycleExecutor', () => {
})
);
expect(ruleDataClientMock.getWriter().bulk).toHaveBeenCalledWith(
expect((await ruleDataClientMock.getWriter()).bulk).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.arrayContaining([
// alert document
@ -338,7 +338,7 @@ describe('createLifecycleExecutor', () => {
]),
})
);
expect(ruleDataClientMock.getWriter().bulk).not.toHaveBeenCalledWith(
expect((await ruleDataClientMock.getWriter()).bulk).not.toHaveBeenCalledWith(
expect.objectContaining({
body: expect.arrayContaining([
// evaluation documents
@ -375,7 +375,35 @@ describe('createLifecycleExecutor', () => {
})
);
expect(ruleDataClientMock.getWriter).not.toHaveBeenCalled();
expect((await ruleDataClientMock.getWriter()).bulk).not.toHaveBeenCalled();
});
it('throws error when writer initialization fails', async () => {
const logger = loggerMock.create();
const ruleDataClientMock = createRuleDataClientMock();
ruleDataClientMock.getWriter = jest
.fn()
.mockRejectedValueOnce(new Error('error initializing!'));
const executor = createLifecycleExecutor(
logger,
ruleDataClientMock
)<{}, TestRuleState, never, never, never>(async (options) => {
const nextRuleState: TestRuleState = {
aRuleStateKey: 'NEXT_RULE_STATE_VALUE',
};
return nextRuleState;
});
await expect(() =>
executor(
createDefaultAlertExecutorOptions({
params: {},
state: { wrapped: initialRuleState, trackedAlerts: {} },
shouldWriteAlerts: false,
})
)
).rejects.toThrowErrorMatchingInlineSnapshot(`"error initializing!"`);
});
});

View file

@ -145,6 +145,8 @@ export const createLifecycleExecutor =
state: previousState,
} = options;
const ruleDataClientWriter = await ruleDataClient.getWriter();
const state = getOrElse(
(): WrappedLifecycleRuleState<State> => ({
wrapped: previousState as State,
@ -267,7 +269,7 @@ export const createLifecycleExecutor =
if (allEventsToIndex.length > 0 && writeAlerts) {
logger.debug(`[Rule Registry] Preparing to index ${allEventsToIndex.length} alerts.`);
await ruleDataClient.getWriter().bulk({
await ruleDataClientWriter.bulk({
body: allEventsToIndex.flatMap(({ event, indexName }) => [
indexName
? { index: { _id: event[ALERT_UUID]!, _index: indexName, require_alias: false } }
@ -275,6 +277,10 @@ export const createLifecycleExecutor =
event,
]),
});
} else {
logger.debug(
`[Rule Registry] Not indexing ${allEventsToIndex.length} alerts because writing has been disabled.`
);
}
const nextTrackedAlerts = Object.fromEntries(

View file

@ -158,7 +158,7 @@ describe('createLifecycleRuleTypeFactory', () => {
},
]);
expect(helpers.ruleDataClientMock.getWriter().bulk).toHaveBeenCalledTimes(0);
expect((await helpers.ruleDataClientMock.getWriter()).bulk).toHaveBeenCalledTimes(0);
});
});
@ -178,7 +178,7 @@ describe('createLifecycleRuleTypeFactory', () => {
},
]);
expect(helpers.ruleDataClientMock.getWriter().bulk).toHaveBeenCalledTimes(0);
expect((await helpers.ruleDataClientMock.getWriter()).bulk).toHaveBeenCalledTimes(0);
});
});
@ -200,10 +200,10 @@ describe('createLifecycleRuleTypeFactory', () => {
]);
});
it('writes the correct alerts', () => {
expect(helpers.ruleDataClientMock.getWriter().bulk).toHaveBeenCalledTimes(1);
it('writes the correct alerts', async () => {
expect((await helpers.ruleDataClientMock.getWriter()).bulk).toHaveBeenCalledTimes(1);
const body = helpers.ruleDataClientMock.getWriter().bulk.mock.calls[0][0].body!;
const body = (await helpers.ruleDataClientMock.getWriter()).bulk.mock.calls[0][0].body!;
const documents = body.filter((op: any) => !('index' in op)) as any[];
@ -302,9 +302,10 @@ describe('createLifecycleRuleTypeFactory', () => {
]);
// TODO mock the resolved value before calling alertWithLifecycle again
const lastOpbeansNodeDoc = helpers.ruleDataClientMock
.getWriter()
.bulk.mock.calls[0][0].body?.concat()
const lastOpbeansNodeDoc = (
await helpers.ruleDataClientMock.getWriter()
).bulk.mock.calls[0][0].body
?.concat()
.reverse()
.find(
(doc: any) => !('index' in doc) && doc['service.name'] === 'opbeans-node'
@ -345,9 +346,9 @@ describe('createLifecycleRuleTypeFactory', () => {
]);
});
it('writes the correct alerts', () => {
expect(helpers.ruleDataClientMock.getWriter().bulk).toHaveBeenCalledTimes(2);
const body = helpers.ruleDataClientMock.getWriter().bulk.mock.calls[1][0].body!;
it('writes the correct alerts', async () => {
expect((await helpers.ruleDataClientMock.getWriter()).bulk).toHaveBeenCalledTimes(2);
const body = (await helpers.ruleDataClientMock.getWriter()).bulk.mock.calls[1][0].body!;
const documents = body.filter((op: any) => !('index' in op)) as any[];
@ -383,9 +384,10 @@ describe('createLifecycleRuleTypeFactory', () => {
},
]);
const lastOpbeansNodeDoc = helpers.ruleDataClientMock
.getWriter()
.bulk.mock.calls[0][0].body?.concat()
const lastOpbeansNodeDoc = (
await helpers.ruleDataClientMock.getWriter()
).bulk.mock.calls[0][0].body
?.concat()
.reverse()
.find(
(doc: any) => !('index' in doc) && doc['service.name'] === 'opbeans-node'
@ -418,10 +420,10 @@ describe('createLifecycleRuleTypeFactory', () => {
]);
});
it('writes the correct alerts', () => {
expect(helpers.ruleDataClientMock.getWriter().bulk).toHaveBeenCalledTimes(2);
it('writes the correct alerts', async () => {
expect((await helpers.ruleDataClientMock.getWriter()).bulk).toHaveBeenCalledTimes(2);
const body = helpers.ruleDataClientMock.getWriter().bulk.mock.calls[1][0].body!;
const body = (await helpers.ruleDataClientMock.getWriter()).bulk.mock.calls[1][0].body!;
const documents = body.filter((op: any) => !('index' in op)) as any[];

View file

@ -26,6 +26,10 @@ export const createPersistenceRuleTypeWrapper: CreatePersistenceRuleTypeWrapper
const numAlerts = alerts.length;
logger.debug(`Found ${numAlerts} alerts.`);
const ruleDataClientWriter = await ruleDataClient.getWriter({
namespace: options.spaceId,
});
// Only write alerts if:
// - writing is enabled
// AND
@ -92,15 +96,13 @@ export const createPersistenceRuleTypeWrapper: CreatePersistenceRuleTypeWrapper
};
});
const response = await ruleDataClient
.getWriter({ namespace: options.spaceId })
.bulk({
body: augmentedAlerts.flatMap((alert) => [
{ create: { _id: alert._id } },
alert._source,
]),
refresh,
});
const response = await ruleDataClientWriter.bulk({
body: augmentedAlerts.flatMap((alert) => [
{ create: { _id: alert._id } },
alert._source,
]),
refresh,
});
if (response == null) {
return { createdAlerts: [], errors: {} };

View file

@ -89,7 +89,7 @@ describe('Custom Query Alerts', () => {
params,
});
expect(ruleDataClient.getWriter().bulk).not.toHaveBeenCalled();
expect((await ruleDataClient.getWriter()).bulk).not.toHaveBeenCalled();
expect(eventsTelemetry.queueTelemetryEvents).not.toHaveBeenCalled();
});
@ -131,7 +131,7 @@ describe('Custom Query Alerts', () => {
await executor({ params });
expect(ruleDataClient.getWriter().bulk).toHaveBeenCalled();
expect((await ruleDataClient.getWriter()).bulk).toHaveBeenCalled();
expect(eventsTelemetry.queueTelemetryEvents).toHaveBeenCalled();
});
});