[EDR Workflows][Investigation] Telemetry config watcher fix (#210406)

## Summary

To update the `global_telemetry_config` flag in Defend package policies,
we subscribe to the Telemetry plugin's `isOptedIn$` observable during
Kibana's `start()` phase, and receive the initial value immediately.
This feature is used for 'migrating' existing package policies: after
stack upgrade, when Kibana starts up, this subscription mechanism makes
sure that existing policies are backfilled with the new field.

But not on cloud and serverless instances.

It turned out, that while this works on local instances, on cloud and
serverless instances, at the very moment we receive the value during
`start()`, some mechanisms are not yet green, and this resulted in
`security_exception: missing authentication credentials for REST
request` when trying to read Saved Objects.

As subscribing to`core.status.core$`, and waiting until `ServiceStatus`
for `elasticsearch` and `savedObjects` is `available` didn't solve the
issue, I simply added a retry mechanism, which, at least, protects
against other temporary issues as well.

Some additional logging is added as well.

### Checklist

Check the PR satisfies following conditions. 

Reviewers should verify this PR satisfies this list as well.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Gergő Ábrahám 2025-03-03 13:31:00 +01:00 committed by GitHub
parent 151fa26a5f
commit e4ea87e92b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 206 additions and 65 deletions

View file

@ -6,35 +6,41 @@
*/
import { Subject } from 'rxjs';
import { elasticsearchServiceMock, savedObjectsServiceMock } from '@kbn/core/server/mocks';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { createPackagePolicyServiceMock } from '@kbn/fleet-plugin/server/mocks';
import type { PackagePolicyClient } from '@kbn/fleet-plugin/server';
import type { PackagePolicy, UpdatePackagePolicy } from '@kbn/fleet-plugin/common';
import {
PACKAGE_POLICY_SAVED_OBJECT_TYPE,
type PackagePolicy,
type UpdatePackagePolicy,
} from '@kbn/fleet-plugin/common';
import { createPackagePolicyMock } from '@kbn/fleet-plugin/common/mocks';
import { policyFactory } from '../../../../common/endpoint/models/policy_config';
import type { PolicyConfig } from '../../../../common/endpoint/types';
import { TelemetryConfigWatcher } from './telemetry_watch';
import { TelemetryConfigProvider } from '../../../../common/telemetry_config/telemetry_config_provider';
import { createMockEndpointAppContextService } from '../../mocks';
import type { MockedLogger } from '@kbn/logging-mocks';
import { loggerMock } from '@kbn/logging-mocks';
import type { NewPackagePolicyWithId } from '@kbn/fleet-plugin/server/services/package_policy';
const MockPackagePolicyWithEndpointPolicy = (
cb?: (p: PolicyConfig) => PolicyConfig
): PackagePolicy => {
const packagePolicy = createPackagePolicyMock();
if (!cb) {
// eslint-disable-next-line no-param-reassign
cb = (p) => p;
}
const policyConfig = cb(policyFactory());
const policyConfig = cb?.(policyFactory()) ?? policyFactory();
packagePolicy.inputs[0].config = { policy: { value: policyConfig } };
return packagePolicy;
};
describe('Telemetry config watcher', () => {
const soStartMock = savedObjectsServiceMock.createStartContract();
const esStartMock = elasticsearchServiceMock.createStart();
let packagePolicySvcMock: jest.Mocked<PackagePolicyClient>;
let mockedLogger: MockedLogger;
let packagePolicyServiceMock: jest.Mocked<PackagePolicyClient>;
let telemetryWatcher: TelemetryConfigWatcher;
const preparePackagePolicyMock = ({
@ -42,7 +48,7 @@ describe('Telemetry config watcher', () => {
}: {
isGlobalTelemetryEnabled: boolean;
}) => {
packagePolicySvcMock.list.mockResolvedValueOnce({
packagePolicyServiceMock.list.mockResolvedValueOnce({
items: [
MockPackagePolicyWithEndpointPolicy((pc: PolicyConfig): PolicyConfig => {
pc.global_telemetry_enabled = isGlobalTelemetryEnabled;
@ -56,13 +62,21 @@ describe('Telemetry config watcher', () => {
};
beforeEach(() => {
packagePolicySvcMock = createPackagePolicyServiceMock();
packagePolicyServiceMock = createPackagePolicyServiceMock();
packagePolicyServiceMock.bulkUpdate.mockResolvedValue({
updatedPolicies: [],
failedPolicies: [],
});
mockedLogger = loggerMock.create();
const endpointAppContextServiceMock = createMockEndpointAppContextService();
endpointAppContextServiceMock.createLogger = jest.fn().mockReturnValue(mockedLogger);
telemetryWatcher = new TelemetryConfigWatcher(
packagePolicySvcMock,
soStartMock,
packagePolicyServiceMock,
esStartMock,
createMockEndpointAppContextService()
endpointAppContextServiceMock,
{ immediateRetry: true }
);
});
@ -90,7 +104,7 @@ describe('Telemetry config watcher', () => {
const TOTAL = 247;
// set up the mocked package policy service to return and do what we want
packagePolicySvcMock.list
packagePolicyServiceMock.list
.mockResolvedValueOnce({
items: Array.from({ length: 100 }, () => MockPackagePolicyWithEndpointPolicy()),
total: TOTAL,
@ -112,12 +126,101 @@ describe('Telemetry config watcher', () => {
await telemetryWatcher.watch(true); // manual trigger
expect(packagePolicySvcMock.list).toBeCalledTimes(3);
expect(packagePolicyServiceMock.list).toBeCalledTimes(3);
// Assert: on the first call to packagePolicy.list, we asked for page 1
expect(packagePolicySvcMock.list.mock.calls[0][1].page).toBe(1);
expect(packagePolicySvcMock.list.mock.calls[1][1].page).toBe(2); // second call, asked for page 2
expect(packagePolicySvcMock.list.mock.calls[2][1].page).toBe(3); // etc
expect(packagePolicyServiceMock.list.mock.calls[0][1].page).toBe(1);
expect(packagePolicyServiceMock.list.mock.calls[1][1].page).toBe(2); // second call, asked for page 2
expect(packagePolicyServiceMock.list.mock.calls[2][1].page).toBe(3); // etc
expect(mockedLogger.warn).not.toHaveBeenCalled();
expect(mockedLogger.error).not.toHaveBeenCalled();
});
describe('error handling', () => {
it('retries fetching package policies', async () => {
packagePolicyServiceMock.list.mockRejectedValueOnce(new Error());
packagePolicyServiceMock.list.mockResolvedValueOnce({
items: Array.from({ length: 6 }, () => MockPackagePolicyWithEndpointPolicy()),
total: 6,
page: 1,
perPage: 100,
});
await telemetryWatcher.watch(true);
expect(packagePolicyServiceMock.list).toBeCalledTimes(2);
const expectedParams = {
page: 1,
perPage: 100,
kuery: `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name: endpoint`,
};
expect(packagePolicyServiceMock.list.mock.calls[0][1]).toStrictEqual(expectedParams);
expect(packagePolicyServiceMock.list.mock.calls[1][1]).toStrictEqual(expectedParams);
expect(mockedLogger.warn).not.toHaveBeenCalled();
expect(mockedLogger.error).not.toHaveBeenCalled();
});
it('retries fetching package policies maximum 5 times', async () => {
packagePolicyServiceMock.list.mockRejectedValue(new Error());
await telemetryWatcher.watch(true);
expect(packagePolicyServiceMock.list).toBeCalledTimes(5);
expect(mockedLogger.warn).toHaveBeenCalledTimes(1);
expect(mockedLogger.error).not.toHaveBeenCalled();
});
it('retries bulk updating package policies', async () => {
preparePackagePolicyMock({ isGlobalTelemetryEnabled: true });
packagePolicyServiceMock.bulkUpdate.mockRejectedValueOnce(new Error());
await telemetryWatcher.watch(false);
expect(packagePolicyServiceMock.bulkUpdate).toHaveBeenCalledTimes(2);
expect(mockedLogger.warn).not.toHaveBeenCalled();
expect(mockedLogger.error).not.toHaveBeenCalled();
});
it('retries bulk updating package policies maximum 5 times', async () => {
preparePackagePolicyMock({ isGlobalTelemetryEnabled: true });
packagePolicyServiceMock.bulkUpdate.mockRejectedValue(new Error());
await telemetryWatcher.watch(false);
expect(packagePolicyServiceMock.bulkUpdate).toHaveBeenCalledTimes(5);
expect(mockedLogger.warn).toHaveBeenCalledTimes(1);
expect(mockedLogger.error).not.toHaveBeenCalled();
});
it('logs the ids of package policies that are failed to be updated', async () => {
preparePackagePolicyMock({ isGlobalTelemetryEnabled: true });
packagePolicyServiceMock.bulkUpdate.mockResolvedValueOnce({
updatedPolicies: [],
failedPolicies: [
{
error: new Error('error message 1'),
packagePolicy: { id: 'policy-id-1' } as NewPackagePolicyWithId,
},
{
error: new Error('error message 2'),
packagePolicy: { id: 'policy-id-2' } as NewPackagePolicyWithId,
},
],
});
await telemetryWatcher.watch(false);
expect(packagePolicyServiceMock.bulkUpdate).toHaveBeenCalledTimes(1);
expect(mockedLogger.warn).toHaveBeenCalledTimes(1);
expect(mockedLogger.error).not.toHaveBeenCalled();
const logMessage = mockedLogger.warn.mock.calls[0][0] as string;
expect(logMessage).toMatch(/- id: policy-id-1, error:.+error message 1/);
expect(logMessage).toMatch(/- id: policy-id-2, error:.+error message 2/);
});
});
it.each([true, false])(
@ -127,17 +230,18 @@ describe('Telemetry config watcher', () => {
await telemetryWatcher.watch(value);
expect(packagePolicySvcMock.bulkUpdate).not.toHaveBeenCalled();
expect(packagePolicyServiceMock.bulkUpdate).not.toHaveBeenCalled();
}
);
it.each([true, false])('updates `global_telemetry_config` field to %s', async (value) => {
it.each([true, false])('updates `global_telemetry_enabled` field to %s', async (value) => {
preparePackagePolicyMock({ isGlobalTelemetryEnabled: !value });
await telemetryWatcher.watch(value);
expect(packagePolicySvcMock.bulkUpdate).toHaveBeenCalled();
const policyUpdates: UpdatePackagePolicy[] = packagePolicySvcMock.bulkUpdate.mock.calls[0][2];
expect(packagePolicyServiceMock.bulkUpdate).toHaveBeenCalled();
const policyUpdates: UpdatePackagePolicy[] =
packagePolicyServiceMock.bulkUpdate.mock.calls[0][2];
expect(policyUpdates.length).toBe(1);
const updatedPolicyConfigs: PolicyConfig = policyUpdates[0].inputs[0].config?.policy.value;
expect(updatedPolicyConfigs.global_telemetry_enabled).toBe(value);

View file

@ -10,36 +10,48 @@ import type { Subscription } from 'rxjs';
import type {
ElasticsearchClient,
ElasticsearchServiceStart,
KibanaRequest,
Logger,
SavedObjectsClientContract,
SavedObjectsServiceStart,
} from '@kbn/core/server';
import type { PackagePolicy, UpdatePackagePolicy } from '@kbn/fleet-plugin/common';
import { PACKAGE_POLICY_SAVED_OBJECT_TYPE } from '@kbn/fleet-plugin/common';
import type { PackagePolicyClient } from '@kbn/fleet-plugin/server';
import { SECURITY_EXTENSION_ID } from '@kbn/core-saved-objects-server';
import pRetry from 'p-retry';
import type { TelemetryConfigProvider } from '../../../../common/telemetry_config/telemetry_config_provider';
import type { PolicyData } from '../../../../common/endpoint/types';
import { getPolicyDataForUpdate } from '../../../../common/endpoint/service/policy';
import type { EndpointAppContextService } from '../../endpoint_app_context_services';
import { stringify } from '../../utils/stringify';
interface TelemetryConfigWatcherOptions {
/** Retry SO operations immediately, without any delay. Useful for testing.
*/
immediateRetry: boolean;
}
export class TelemetryConfigWatcher {
private logger: Logger;
private esClient: ElasticsearchClient;
private policyService: PackagePolicyClient;
private endpointAppContextService: EndpointAppContextService;
private subscription: Subscription | undefined;
private soStart: SavedObjectsServiceStart;
private retryOptions: Partial<pRetry.Options>;
constructor(
policyService: PackagePolicyClient,
soStart: SavedObjectsServiceStart,
esStart: ElasticsearchServiceStart,
endpointAppContextService: EndpointAppContextService
endpointAppContextService: EndpointAppContextService,
options: TelemetryConfigWatcherOptions = { immediateRetry: false }
) {
this.policyService = policyService;
this.esClient = esStart.client.asInternalUser;
this.endpointAppContextService = endpointAppContextService;
this.logger = endpointAppContextService.createLogger(this.constructor.name);
this.soStart = soStart;
this.retryOptions = {
retries: 4,
minTimeout: options.immediateRetry ? 0 : 1000,
};
}
/**
@ -49,16 +61,8 @@ export class TelemetryConfigWatcher {
* intentionally using the system user here. Be very aware of what you are using this
* client to do
*/
private makeInternalSOClient(soStart: SavedObjectsServiceStart): SavedObjectsClientContract {
const fakeRequest = {
headers: {},
getBasePath: () => '',
path: '/',
route: { settings: {} },
url: { href: {} },
raw: { req: { url: '/' } },
} as unknown as KibanaRequest;
return soStart.getScopedClient(fakeRequest, { excludedExtensions: [SECURITY_EXTENSION_ID] });
private makeInternalSOClient(): SavedObjectsClientContract {
return this.endpointAppContextService.savedObjects.createInternalUnscopedSoClient(false);
}
public start(telemetryConfigProvider: TelemetryConfigProvider) {
@ -79,6 +83,8 @@ export class TelemetryConfigWatcher {
page: number;
perPage: number;
};
let updated = 0;
let failed = 0;
this.logger.debug(
`Checking Endpoint policies to update due to changed global telemetry config setting. (New value: ${isTelemetryEnabled})`
@ -86,14 +92,28 @@ export class TelemetryConfigWatcher {
do {
try {
response = await this.policyService.list(this.makeInternalSOClient(this.soStart), {
page: page++,
perPage: 100,
kuery: `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name: endpoint`,
});
response = await pRetry(
() =>
this.policyService.list(this.makeInternalSOClient(), {
page,
perPage: 100,
kuery: `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name: endpoint`,
}),
{
onFailedAttempt: (error) =>
this.logger.debug(
`Failed to read package policies on ${
error.attemptNumber
}. attempt on page ${page}, reason: ${stringify(error)}`
),
...this.retryOptions,
}
);
} catch (e) {
this.logger.warn(
`Unable to verify endpoint policies in line with telemetry change: failed to fetch package policies: ${e.message}`
`Unable to verify endpoint policies in line with telemetry change: failed to fetch package policies: ${stringify(
e
)}`
);
return;
}
@ -112,29 +132,46 @@ export class TelemetryConfigWatcher {
if (updates.length) {
try {
await this.policyService.bulkUpdate(
this.makeInternalSOClient(this.soStart),
this.esClient,
updates
const updateResult = await pRetry(
() =>
this.policyService.bulkUpdate(this.makeInternalSOClient(), this.esClient, updates),
{
onFailedAttempt: (error) =>
this.logger.debug(
`Failed to bulk update package policies on ${
error.attemptNumber
}. attempt, reason: ${stringify(error)}`
),
...this.retryOptions,
}
);
} catch (e) {
// try again for transient issues
try {
await this.policyService.bulkUpdate(
this.makeInternalSOClient(this.soStart),
this.esClient,
updates
);
} catch (ee) {
if (updateResult.failedPolicies.length) {
this.logger.warn(
`Unable to update telemetry config state to ${isTelemetryEnabled} in policies: ${updates.map(
(update) => update.id
)}`
`Cannot update telemetry flag in the following policies:\n${updateResult.failedPolicies
.map((entry) => `- id: ${entry.packagePolicy.id}, error: ${stringify(entry.error)}`)
.join('\n')}`
);
this.logger.warn(ee);
}
updated += updateResult.updatedPolicies?.length ?? 0;
failed += updateResult.failedPolicies.length;
} catch (e) {
this.logger.warn(
`Unable to update telemetry config state to ${isTelemetryEnabled} in policies: ${updates.map(
(update) => update.id
)}\n\n${stringify(e)}`
);
failed += updates.length;
}
}
page++;
} while (response.page * response.perPage < response.total);
this.logger.info(
`Finished updating global_telemetry_enabled flag to ${isTelemetryEnabled} in Defend package policies: ${updated} succeeded, ${failed} failed.`
);
}
}

View file

@ -690,9 +690,8 @@ export class Plugin implements ISecuritySolutionPlugin {
this.telemetryWatcher = new TelemetryConfigWatcher(
plugins.fleet.packagePolicyService,
core.savedObjects,
core.elasticsearch,
this.endpointContext.service
this.endpointAppContextService
);
this.telemetryWatcher.start(this.telemetryConfigProvider);
}
@ -786,6 +785,7 @@ export class Plugin implements ISecuritySolutionPlugin {
this.telemetryEventsSender.stop();
this.endpointAppContextService.stop();
this.policyWatcher?.stop();
this.telemetryWatcher?.stop();
this.completeExternalResponseActionsTask.stop().catch(() => {});
this.siemMigrationsService.stop();
securityWorkflowInsightsService.stop();