[Response Ops][Alerting] Skip alerts resource installation on migrator nodes (#169443)

Resolves https://github.com/elastic/response-ops-team/issues/148

## Summary

This PR makes the following changes:
* Skips the initialization of the `AlertsService` for migrator nodes.
This bypasses the installation of alerts as data assets on migrator
nodes, which are short-lived nodes used on Serverless
* Changes error logs to debug logs when alert resource installation is
cut short due to Kibana shutdown.

## To Verify

Run Kibana with the following configs:
* no config for `node.roles` - Kibana should start up and install
alerts-as-data resources
* `node.roles: ["background_tasks"]` - Kibana should start up and
install alerts-as-data resources
* `node.roles: ["ui"]` - Kibana should start up and install
alerts-as-data resources
* `node.roles: ["migrator"]` - Kibana should start up and not install
alerts-as-data resources. No errors related to premature shutdown should
be visible in the logs.
This commit is contained in:
Ying Mao 2023-10-23 11:16:01 -04:00 committed by GitHub
parent 38ca94f8b7
commit 3156d8b9e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 87 additions and 32 deletions

View file

@ -2370,10 +2370,10 @@ describe('Alerts Service', () => {
dataStreamAdapter,
});
await retryUntil('error logger called', async () => logger.error.mock.calls.length > 0);
await retryUntil('debug logger called', async () => logger.debug.mock.calls.length > 0);
expect(logger.error).toHaveBeenCalledWith(
new Error(`Server is stopping; must stop all async operations`)
expect(logger.debug).toHaveBeenCalledWith(
`Server is stopping; must stop all async operations`
);
});
});

View file

@ -40,6 +40,7 @@ import {
createOrUpdateIndexTemplate,
createConcreteWriteIndex,
installWithTimeout,
InstallShutdownError,
} from './lib';
import type { LegacyAlertsClientParams, AlertRuleData } from '../alerts_client';
import { AlertsClient } from '../alerts_client';
@ -357,9 +358,14 @@ export class AlertsService implements IAlertsService {
this.isInitializing = false;
return successResult();
} catch (err) {
this.options.logger.error(
`Error installing common resources for AlertsService. No additional resources will be installed and rule execution may be impacted. - ${err.message}`
);
if (err instanceof InstallShutdownError) {
this.options.logger.debug(err.message);
} else {
this.options.logger.error(
`Error installing common resources for AlertsService. No additional resources will be installed and rule execution may be impacted. - ${err.message}`
);
}
this.initialized = false;
this.isInitializing = false;
return errorResult(err.message);

View file

@ -25,4 +25,5 @@ export {
createConcreteWriteIndex,
installWithTimeout,
isValidAlertIndexName,
InstallShutdownError,
} from './lib';

View file

@ -9,5 +9,5 @@ export { createOrUpdateIlmPolicy } from './create_or_update_ilm_policy';
export { createOrUpdateComponentTemplate } from './create_or_update_component_template';
export { getIndexTemplate, createOrUpdateIndexTemplate } from './create_or_update_index_template';
export { createConcreteWriteIndex } from './create_concrete_write_index';
export { installWithTimeout } from './install_with_timeout';
export { installWithTimeout, InstallShutdownError } from './install_with_timeout';
export { isValidAlertIndexName } from './is_valid_alert_index_name';

View file

@ -61,7 +61,7 @@ describe('installWithTimeout', () => {
timeoutMs: 10,
})
).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failure during installation. Server is stopping; must stop all async operations"`
`"Server is stopping; must stop all async operations"`
);
expect(logger.info).not.toHaveBeenCalled();
});

View file

@ -18,6 +18,13 @@ interface InstallWithTimeoutOpts {
timeoutMs?: number;
}
export class InstallShutdownError extends Error {
constructor() {
super('Server is stopping; must stop all async operations');
Object.setPrototypeOf(this, InstallShutdownError.prototype);
}
}
export const installWithTimeout = async ({
description,
installFn,
@ -43,18 +50,22 @@ export const installWithTimeout = async ({
firstValueFrom(pluginStop$).then(() => {
clearTimeout(timeoutId);
reject(new Error('Server is stopping; must stop all async operations'));
reject(new InstallShutdownError());
});
});
};
await Promise.race([install(), throwTimeoutException()]);
} catch (e) {
logger.error(e);
const reason = e?.message || 'Unknown reason';
throw new Error(
`Failure during installation${description ? ` of ${description}` : ''}. ${reason}`
);
if (e instanceof InstallShutdownError) {
logger.debug(e.message);
throw e;
} else {
logger.error(e);
const reason = e?.message || 'Unknown reason';
throw new Error(
`Failure during installation${description ? ` of ${description}` : ''}. ${reason}`
);
}
}
};

View file

@ -66,6 +66,7 @@ export {
createConcreteWriteIndex,
installWithTimeout,
isValidAlertIndexName,
InstallShutdownError,
} from './alerts_service';
export { getDataStreamAdapter } from './alerts_service/lib/data_stream_adapter';

View file

@ -132,6 +132,23 @@ describe('Alerting Plugin', () => {
expect(setupContract.frameworkAlerts.enabled()).toEqual(true);
});
it('should not initialize AlertsService if node.roles.migrator is true', async () => {
const context = coreMock.createPluginInitializerContext<AlertingConfig>({
...generateAlertingConfig(),
enableFrameworkAlerts: true,
});
context.node.roles.migrator = true;
plugin = new AlertingPlugin(context);
// need await to test number of calls of setupMocks.status.set, because it is under async function which awaiting core.getStartServices()
const setupContract = plugin.setup(setupMocks, mockPlugins);
await waitForSetupComplete(setupMocks);
expect(AlertsService).not.toHaveBeenCalled();
expect(setupContract.frameworkAlerts.enabled()).toEqual(true);
});
it(`exposes configured minimumScheduleInterval()`, async () => {
const context = coreMock.createPluginInitializerContext<AlertingConfig>(
generateAlertingConfig()

View file

@ -215,6 +215,7 @@ export class AlertingPlugin {
private alertsService: AlertsService | null;
private pluginStop$: Subject<void>;
private dataStreamAdapter?: DataStreamAdapter;
private nodeRoles: PluginInitializerContext['node']['roles'];
constructor(initializerContext: PluginInitializerContext) {
this.config = initializerContext.config.get();
@ -222,6 +223,7 @@ export class AlertingPlugin {
this.taskRunnerFactory = new TaskRunnerFactory();
this.rulesClientFactory = new RulesClientFactory();
this.alertsService = null;
this.nodeRoles = initializerContext.node.roles;
this.alertingAuthorizationClientFactory = new AlertingAuthorizationClientFactory();
this.rulesSettingsClientFactory = new RulesSettingsClientFactory();
this.maintenanceWindowClientFactory = new MaintenanceWindowClientFactory();
@ -241,11 +243,6 @@ export class AlertingPlugin {
const useDataStreamForAlerts = !!plugins.serverless;
this.dataStreamAdapter = getDataStreamAdapter({ useDataStreamForAlerts });
this.logger.info(
`using ${
this.dataStreamAdapter.isUsingDataStreams() ? 'datastreams' : 'indexes and aliases'
} for persisting alerts`
);
core.capabilities.registerProvider(() => {
return {
@ -278,15 +275,24 @@ export class AlertingPlugin {
plugins.eventLog.registerProviderActions(EVENT_LOG_PROVIDER, Object.values(EVENT_LOG_ACTIONS));
if (this.config.enableFrameworkAlerts) {
this.alertsService = new AlertsService({
logger: this.logger,
pluginStop$: this.pluginStop$,
kibanaVersion: this.kibanaVersion,
dataStreamAdapter: this.dataStreamAdapter!,
elasticsearchClientPromise: core
.getStartServices()
.then(([{ elasticsearch }]) => elasticsearch.client.asInternalUser),
});
if (this.nodeRoles.migrator) {
this.logger.info(`Skipping initialization of AlertsService on migrator node`);
} else {
this.logger.info(
`using ${
this.dataStreamAdapter.isUsingDataStreams() ? 'datastreams' : 'indexes and aliases'
} for persisting alerts`
);
this.alertsService = new AlertsService({
logger: this.logger,
pluginStop$: this.pluginStop$,
kibanaVersion: this.kibanaVersion,
dataStreamAdapter: this.dataStreamAdapter!,
elasticsearchClientPromise: core
.getStartServices()
.then(([{ elasticsearch }]) => elasticsearch.client.asInternalUser),
});
}
}
const ruleTypeRegistry = new RuleTypeRegistry({

View file

@ -11,7 +11,11 @@ import type { ValidFeatureId } from '@kbn/rule-data-utils';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { PublicFrameworkAlertsService, DataStreamAdapter } from '@kbn/alerting-plugin/server';
import {
PublicFrameworkAlertsService,
DataStreamAdapter,
InstallShutdownError,
} from '@kbn/alerting-plugin/server';
import { INDEX_PREFIX } from '../config';
import { type IRuleDataClient, RuleDataClient, WaitResult } from '../rule_data_client';
import { IndexInfo } from './index_info';
@ -158,7 +162,12 @@ export class RuleDataService implements IRuleDataService {
.installCommonResources()
.then(() => right('ok' as const))
.catch((e) => {
this.options.logger.error(e);
if (e instanceof InstallShutdownError) {
this.options.logger.debug(e.message);
} else {
this.options.logger.error(e);
}
return left(e); // propagates it to the index initialization phase
});
@ -203,7 +212,11 @@ export class RuleDataService implements IRuleDataService {
const clusterClient = await this.options.getClusterClient();
return right(clusterClient);
} catch (e) {
this.options.logger.error(e);
if (e instanceof InstallShutdownError) {
this.options.logger.debug(e.message);
} else {
this.options.logger.error(e);
}
return left(e);
}
};