[Fleet] add index and task for fleet-synced-integrations (#209762)

## Summary

Closes https://github.com/elastic/kibana/issues/206237

Create `fleet-synced-integrations` index in Fleet setup, added async
task that populates the index with a doc that includes remote ES output
data and installed integrations data.

ES change to add `kibana_system` privileges:
https://github.com/elastic/elasticsearch/pull/121753

To test locally:
- run elasticsearch from source to apply the privilege changes, so that
`kibana_system` can create the index.
```
yarn es source -E xpack.security.authc.api_key.enabled=true -E xpack.security.authc.token.enabled=true  --source-path=/Users/juliabardi/elasticsearch  -E path.data=/tmp/es-data -E xpack.ml.enabled=false
```
- enable the feature flag in `kibana.dev.yml`:
`xpack.fleet.enableExperimental: ['enableSyncIntegrationsOnRemote']`
- add a remote ES output with sync enabled
- install some integrations
- wait until Fleet setup and the task runs
- verify that the index is created and contains a doc with the expected
data

```
GET fleet-synced-integrations/_search

 "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "fleet-synced-integrations",
        "_id": "fleet-synced-integrations",
        "_score": 1,
        "_source": {
          "remote_es_hosts": [
            {
              "hosts": [
                "http://remote1:80"
              ],
              "name": "remote1",
              "sync_integrations": true
            }
          ],
          "integrations": [
            {
              "package_version": "1.64.1",
              "updated_at": "2025-02-05T11:03:02.226Z",
              "package_name": "system"
            }
          ]
        }
      }
    ]
```



### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Julia Bardi 2025-02-12 15:05:58 +01:00 committed by GitHub
parent 9fa8ec42a6
commit 6c257ab50c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 632 additions and 1 deletions

View file

@ -140,6 +140,7 @@ export const createAppContextStartContractMock = (
: {}),
unenrollInactiveAgentsTask: {} as any,
deleteUnenrolledAgentsTask: {} as any,
syncIntegrationsTask: {} as any,
};
};

View file

@ -147,6 +147,7 @@ import { registerUpgradeManagedPackagePoliciesTask } from './services/setup/mana
import { registerDeployAgentPoliciesTask } from './services/agent_policies/deploy_agent_policies_task';
import { DeleteUnenrolledAgentsTask } from './tasks/delete_unenrolled_agents_task';
import { registerBumpAgentPoliciesTask } from './services/agent_policies/bump_agent_policies_task';
import { SyncIntegrationsTask } from './tasks/sync_integrations_task';
export interface FleetSetupDeps {
security: SecurityPluginSetup;
@ -200,6 +201,7 @@ export interface FleetAppContext {
deleteUnenrolledAgentsTask: DeleteUnenrolledAgentsTask;
taskManagerStart?: TaskManagerStartContract;
fetchUsage?: (abortController: AbortController) => Promise<FleetUsage | undefined>;
syncIntegrationsTask: SyncIntegrationsTask;
}
export type FleetSetupContract = void;
@ -301,6 +303,7 @@ export class FleetPlugin
private fleetMetricsTask?: FleetMetricsTask;
private unenrollInactiveAgentsTask?: UnenrollInactiveAgentsTask;
private deleteUnenrolledAgentsTask?: DeleteUnenrolledAgentsTask;
private syncIntegrationsTask?: SyncIntegrationsTask;
private agentService?: AgentService;
private packageService?: PackageService;
@ -647,6 +650,11 @@ export class FleetPlugin
taskManager: deps.taskManager,
logFactory: this.initializerContext.logger,
});
this.syncIntegrationsTask = new SyncIntegrationsTask({
core,
taskManager: deps.taskManager,
logFactory: this.initializerContext.logger,
});
// Register fields metadata extractors
registerFieldsMetadataExtractors({ core, fieldsMetadata: deps.fieldsMetadata });
@ -696,6 +704,7 @@ export class FleetPlugin
deleteUnenrolledAgentsTask: this.deleteUnenrolledAgentsTask!,
taskManagerStart: plugins.taskManager,
fetchUsage: this.fetchUsage,
syncIntegrationsTask: this.syncIntegrationsTask!,
});
licenseService.start(plugins.licensing.license$);
this.telemetryEventsSender.start(plugins.telemetry, core).catch(() => {});
@ -708,6 +717,7 @@ export class FleetPlugin
this.fleetMetricsTask
?.start(plugins.taskManager, core.elasticsearch.client.asInternalUser)
.catch(() => {});
this.syncIntegrationsTask?.start({ taskManager: plugins.taskManager }).catch(() => {});
const logger = appContextService.getLogger();

View file

@ -311,7 +311,7 @@ export async function getPackageSavedObjects(
return result;
}
async function getInstalledPackageSavedObjects(
export async function getInstalledPackageSavedObjects(
savedObjectsClient: SavedObjectsClientContract,
options: Omit<GetInstalledPackagesOptions, 'savedObjectsClient' | 'esClient'>
) {

View file

@ -64,6 +64,7 @@ import {
} from './preconfiguration/delete_unenrolled_agent_setting';
import { backfillPackagePolicySupportsAgentless } from './backfill_agentless';
import { updateDeprecatedComponentTemplates } from './setup/update_deprecated_component_templates';
import { createOrUpdateFleetSyncedIntegrationsIndex } from './setup/fleet_synced_integrations';
export interface SetupStatus {
isInitialized: boolean;
@ -313,6 +314,9 @@ async function createSetupSideEffects(
logger.debug('Update deprecated _source.mode in component templates');
await updateDeprecatedComponentTemplates(esClient);
logger.debug('Create or update fleet-synced-integrations index');
await createOrUpdateFleetSyncedIntegrationsIndex(esClient);
const nonFatalErrors = [
...preconfiguredPackagesNonFatalErrors,
...(messageSigningServiceNonFatalError ? [messageSigningServiceNonFatalError] : []),

View file

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { createOrUpdateFleetSyncedIntegrationsIndex } from './fleet_synced_integrations';
jest.mock('../app_context', () => ({
appContextService: {
getExperimentalFeatures: jest.fn().mockReturnValue({ enableSyncIntegrationsOnRemote: true }),
},
}));
describe('fleet_synced_integrations', () => {
let esClientMock: any;
const mockExists = jest.fn();
const mockGetMapping = jest.fn();
beforeEach(() => {
esClientMock = {
indices: {
create: jest.fn(),
exists: mockExists,
getMapping: mockGetMapping,
putMapping: jest.fn(),
},
};
});
it('should create index if not exists', async () => {
mockExists.mockResolvedValue(false);
await createOrUpdateFleetSyncedIntegrationsIndex(esClientMock);
expect(esClientMock.indices.create).toHaveBeenCalled();
});
it('should update index if older version exists', async () => {
mockExists.mockResolvedValue(true);
mockGetMapping.mockResolvedValue({
'fleet-synced-integrations': {
mappings: {
_meta: {
version: '0.0',
},
},
},
});
await createOrUpdateFleetSyncedIntegrationsIndex(esClientMock);
expect(esClientMock.indices.putMapping).toHaveBeenCalled();
});
it('should not update index if same version exists', async () => {
mockExists.mockResolvedValue(true);
mockGetMapping.mockResolvedValue({
'fleet-synced-integrations': {
mappings: {
_meta: {
version: '1.0',
},
},
},
});
await createOrUpdateFleetSyncedIntegrationsIndex(esClientMock);
expect(esClientMock.indices.putMapping).not.toHaveBeenCalled();
});
});

View file

@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient } from '@kbn/core/server';
import { FleetSetupError } from '../../errors';
import { appContextService } from '../app_context';
export const FLEET_SYNCED_INTEGRATIONS_INDEX_NAME = 'fleet-synced-integrations';
export const FLEET_SYNCED_INTEGRATIONS_INDEX_CONFIG = {
settings: {
auto_expand_replicas: '0-1',
},
mappings: {
dynamic: false,
_meta: {
version: '1.0',
},
properties: {
remote_es_hosts: {
properties: {
name: {
type: 'keyword',
},
hosts: {
type: 'keyword',
},
sync_integrations: {
type: 'boolean',
},
},
},
integrations: {
properties: {
package_name: {
type: 'keyword',
},
package_version: {
type: 'keyword',
},
updated_at: {
type: 'date',
},
},
},
},
},
};
export async function createOrUpdateFleetSyncedIntegrationsIndex(esClient: ElasticsearchClient) {
const { enableSyncIntegrationsOnRemote } = appContextService.getExperimentalFeatures();
if (!enableSyncIntegrationsOnRemote) {
return;
}
await createOrUpdateIndex(
esClient,
FLEET_SYNCED_INTEGRATIONS_INDEX_NAME,
FLEET_SYNCED_INTEGRATIONS_INDEX_CONFIG
);
}
async function createOrUpdateIndex(
esClient: ElasticsearchClient,
indexName: string,
indexData: any
) {
const resExists = await esClient.indices.exists({
index: indexName,
});
if (resExists) {
return updateIndex(esClient, indexName, indexData);
}
return createIndex(esClient, indexName, indexData);
}
async function updateIndex(esClient: ElasticsearchClient, indexName: string, indexData: any) {
try {
const res = await esClient.indices.getMapping({
index: indexName,
});
const versionChanged =
res[indexName].mappings?._meta?.version !== indexData.mappings._meta.version;
if (versionChanged) {
await esClient.indices.putMapping({
index: indexName,
body: Object.assign({
...indexData.mappings,
}),
});
}
} catch (err) {
throw new FleetSetupError(`update of index [${indexName}] failed ${err}`);
}
}
async function createIndex(esClient: ElasticsearchClient, indexName: string, indexData: any) {
try {
await esClient.indices.create({
index: indexName,
body: indexData,
});
} catch (err) {
if (err?.body?.error?.type !== 'resource_already_exists_exception') {
throw new FleetSetupError(`create of index [${indexName}] failed ${err}`);
}
}
}

View file

@ -9,3 +9,4 @@ export { upgradePackageInstallVersion } from './upgrade_package_install_version'
export { upgradeAgentPolicySchemaVersion } from './upgrade_agent_policy_schema_version';
export { ensureAgentPoliciesFleetServerKeysAndPolicies } from './fleet_server_policies_enrollment_keys';
export { updateDeprecatedComponentTemplates } from './update_deprecated_component_templates';
export { createOrUpdateFleetSyncedIntegrationsIndex } from './fleet_synced_integrations';

View file

@ -0,0 +1,203 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClientMock } from '@kbn/core/server/mocks';
import { coreMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import type { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import { TaskStatus } from '@kbn/task-manager-plugin/server';
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
import type { CoreSetup } from '@kbn/core/server';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { createAppContextStartContractMock } from '../mocks';
import { appContextService, outputService } from '../services';
import { SyncIntegrationsTask, TYPE, VERSION } from './sync_integrations_task';
jest.mock('../services', () => ({
appContextService: {
getExperimentalFeatures: jest.fn().mockReturnValue({ enableSyncIntegrationsOnRemote: true }),
start: jest.fn(),
},
outputService: {
list: jest.fn(),
},
}));
const mockOutputService = outputService as jest.Mocked<typeof outputService>;
jest.mock('../services/epm/packages/get', () => ({
getInstalledPackageSavedObjects: jest.fn().mockResolvedValue({
saved_objects: [
{
attributes: {
name: 'package-1',
version: '0.1.0',
updated_at: new Date().toISOString(),
},
},
{
attributes: {
name: 'package-2',
version: '0.2.0',
updated_at: new Date().toISOString(),
},
},
],
}),
}));
const MOCK_TASK_INSTANCE = {
id: `${TYPE}:${VERSION}`,
runAt: new Date(),
attempts: 0,
ownerId: '',
status: TaskStatus.Running,
startedAt: new Date(),
scheduledAt: new Date(),
retryAt: new Date(),
params: {},
state: {},
taskType: TYPE,
};
describe('SyncIntegrationsTask', () => {
const { createSetup: coreSetupMock } = coreMock;
const { createSetup: tmSetupMock, createStart: tmStartMock } = taskManagerMock;
let mockContract: ReturnType<typeof createAppContextStartContractMock>;
let mockTask: SyncIntegrationsTask;
let mockCore: CoreSetup;
let mockTaskManagerSetup: jest.Mocked<TaskManagerSetupContract>;
beforeEach(() => {
mockContract = createAppContextStartContractMock();
appContextService.start(mockContract);
mockCore = coreSetupMock();
mockTaskManagerSetup = tmSetupMock();
mockTask = new SyncIntegrationsTask({
core: mockCore,
taskManager: mockTaskManagerSetup,
logFactory: loggingSystemMock.create(),
});
});
afterEach(() => {
jest.clearAllMocks();
});
describe('Task lifecycle', () => {
it('Should create task', () => {
expect(mockTask).toBeInstanceOf(SyncIntegrationsTask);
});
it('Should register task', () => {
expect(mockTaskManagerSetup.registerTaskDefinitions).toHaveBeenCalled();
});
it('Should schedule task', async () => {
const mockTaskManagerStart = tmStartMock();
await mockTask.start({ taskManager: mockTaskManagerStart });
expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled();
});
});
describe('Task logic', () => {
let esClient: ElasticsearchClientMock;
const runTask = async (taskInstance = MOCK_TASK_INSTANCE) => {
const mockTaskManagerStart = tmStartMock();
await mockTask.start({ taskManager: mockTaskManagerStart });
const createTaskRunner =
mockTaskManagerSetup.registerTaskDefinitions.mock.calls[0][0][TYPE].createTaskRunner;
const taskRunner = createTaskRunner({ taskInstance });
return taskRunner.run();
};
beforeEach(async () => {
const [{ elasticsearch }] = await mockCore.getStartServices();
esClient = elasticsearch.client.asInternalUser as ElasticsearchClientMock;
esClient.indices.exists.mockResolvedValue(true);
});
afterEach(() => {
jest.clearAllMocks();
});
it('Should not run if task is outdated', async () => {
const result = await runTask({ ...MOCK_TASK_INSTANCE, id: 'old-id' });
expect(result).toEqual(getDeleteTaskRunResult());
});
it('Should update fleet-synced-integrations doc', async () => {
mockOutputService.list.mockResolvedValue({
items: [
{
type: 'remote_elasticsearch',
name: 'remote1',
hosts: ['https://remote1:9200'],
sync_integrations: true,
},
{
type: 'remote_elasticsearch',
name: 'remote2',
hosts: ['https://remote2:9200'],
sync_integrations: false,
},
],
} as any);
await runTask();
expect(esClient.update).toHaveBeenCalledWith(
{
body: {
doc: {
integrations: [
{
package_name: 'package-1',
package_version: '0.1.0',
updated_at: expect.any(String),
},
{
package_name: 'package-2',
package_version: '0.2.0',
updated_at: expect.any(String),
},
],
remote_es_hosts: [
{ hosts: ['https://remote1:9200'], name: 'remote1', sync_integrations: true },
{ hosts: ['https://remote2:9200'], name: 'remote2', sync_integrations: false },
],
},
doc_as_upsert: true,
},
id: 'fleet-synced-integrations',
index: 'fleet-synced-integrations',
},
expect.anything()
);
});
it('Should not update fleet-synced-integrations doc if no outputs with sync enabled', async () => {
mockOutputService.list.mockResolvedValue({
items: [
{
type: 'remote_elasticsearch',
name: 'remote2',
hosts: ['https://remote2:9200'],
sync_integrations: false,
},
],
} as any);
await runTask();
expect(esClient.update).not.toHaveBeenCalled();
});
});
});

View file

@ -0,0 +1,221 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedObjectsClient } from '@kbn/core/server';
import type { CoreSetup, ElasticsearchClient, Logger } from '@kbn/core/server';
import type {
ConcreteTaskInstance,
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
import type { LoggerFactory } from '@kbn/core/server';
import { errors } from '@elastic/elasticsearch';
import { SO_SEARCH_LIMIT, outputType } from '../../common/constants';
import type { NewRemoteElasticsearchOutput } from '../../common/types';
import { appContextService, outputService } from '../services';
import { getInstalledPackageSavedObjects } from '../services/epm/packages/get';
import { FLEET_SYNCED_INTEGRATIONS_INDEX_NAME } from '../services/setup/fleet_synced_integrations';
export const TYPE = 'fleet:sync-integrations-task';
export const VERSION = '1.0.0';
const TITLE = 'Fleet Sync Integrations Task';
const SCOPE = ['fleet'];
const INTERVAL = '5m';
const TIMEOUT = '1m';
interface SyncIntegrationsTaskSetupContract {
core: CoreSetup;
taskManager: TaskManagerSetupContract;
logFactory: LoggerFactory;
}
interface SyncIntegrationsTaskStartContract {
taskManager: TaskManagerStartContract;
}
interface SyncIntegrationsData {
remote_es_hosts: Array<{
name: string;
hosts: string[];
sync_integrations: boolean;
}>;
integrations: Array<{
package_name: string;
package_version: string;
updated_at: string;
}>;
}
export class SyncIntegrationsTask {
private logger: Logger;
private wasStarted: boolean = false;
private abortController = new AbortController();
constructor(setupContract: SyncIntegrationsTaskSetupContract) {
const { core, taskManager, logFactory } = setupContract;
this.logger = logFactory.get(this.taskId);
taskManager.registerTaskDefinitions({
[TYPE]: {
title: TITLE,
timeout: TIMEOUT,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {
run: async () => {
return this.runTask(taskInstance, core);
},
cancel: async () => {
this.abortController.abort('Task cancelled');
},
};
},
},
});
}
public start = async ({ taskManager }: SyncIntegrationsTaskStartContract) => {
if (!taskManager) {
this.logger.error('[SyncIntegrationsTask] Missing required service during start');
return;
}
this.wasStarted = true;
this.logger.info(`[SyncIntegrationsTask] Started with interval of [${INTERVAL}]`);
try {
await taskManager.ensureScheduled({
id: this.taskId,
taskType: TYPE,
scope: SCOPE,
schedule: {
interval: INTERVAL,
},
state: {},
params: { version: VERSION },
});
} catch (e) {
this.logger.error(`Error scheduling task SyncIntegrationsTask, error: ${e.message}`, e);
}
};
private get taskId(): string {
return `${TYPE}:${VERSION}`;
}
private endRun(msg: string = '') {
this.logger.info(`[SyncIntegrationsTask] runTask ended${msg ? ': ' + msg : ''}`);
}
public runTask = async (taskInstance: ConcreteTaskInstance, core: CoreSetup) => {
if (!this.wasStarted) {
this.logger.debug('[SyncIntegrationsTask] runTask Aborted. Task not started yet');
return;
}
// Check that this task is current
if (taskInstance.id !== this.taskId) {
this.logger.debug(
`[SyncIntegrationsTask] Outdated task version: Got [${taskInstance.id}] from task instance. Current version is [${this.taskId}]`
);
return getDeleteTaskRunResult();
}
this.logger.info(`[runTask()] started`);
const [coreStart] = await core.getStartServices();
const esClient = coreStart.elasticsearch.client.asInternalUser;
const soClient = new SavedObjectsClient(coreStart.savedObjects.createInternalRepository());
const { enableSyncIntegrationsOnRemote } = appContextService.getExperimentalFeatures();
if (!enableSyncIntegrationsOnRemote) {
return;
}
const indexExists = await esClient.indices.exists(
{
index: FLEET_SYNCED_INTEGRATIONS_INDEX_NAME,
},
{ signal: this.abortController.signal }
);
if (!indexExists) {
this.logger.info(
`[SyncIntegrationsTask] index ${FLEET_SYNCED_INTEGRATIONS_INDEX_NAME} does not exist`
);
return;
}
try {
await this.updateSyncedIntegrationsData(esClient, soClient);
this.endRun('success');
} catch (err) {
if (err instanceof errors.RequestAbortedError) {
this.logger.warn(`[SyncIntegrationsTask] request aborted due to timeout: ${err}`);
this.endRun();
return;
}
this.logger.error(`[SyncIntegrationsTask] error: ${err}`);
this.endRun('error');
}
};
private updateSyncedIntegrationsData = async (
esClient: ElasticsearchClient,
soClient: SavedObjectsClient
) => {
const outputs = await outputService.list(soClient);
const remoteESOutputs = outputs.items.filter(
(output) => output.type === outputType.RemoteElasticsearch
);
const isSyncEnabled = remoteESOutputs.some(
(output) => (output as NewRemoteElasticsearchOutput).sync_integrations
);
if (!isSyncEnabled) {
return;
}
const newDoc: SyncIntegrationsData = {
remote_es_hosts: remoteESOutputs.map((output) => {
const remoteOutput = output as NewRemoteElasticsearchOutput;
return {
name: remoteOutput.name,
hosts: remoteOutput.hosts ?? [],
sync_integrations: remoteOutput.sync_integrations ?? false,
};
}),
integrations: [],
};
const packageSavedObjects = await getInstalledPackageSavedObjects(soClient, {
perPage: SO_SEARCH_LIMIT,
sortOrder: 'asc',
});
newDoc.integrations = packageSavedObjects.saved_objects.map((item) => {
return {
package_name: item.attributes.name,
package_version: item.attributes.version,
updated_at: item.updated_at ?? new Date().toISOString(),
};
});
await esClient.update(
{
id: FLEET_SYNCED_INTEGRATIONS_INDEX_NAME,
index: FLEET_SYNCED_INTEGRATIONS_INDEX_NAME,
body: {
doc: newDoc,
doc_as_upsert: true,
},
},
{ signal: this.abortController.signal }
);
};
}

View file

@ -151,6 +151,7 @@ export default function ({ getService }: FtrProviderContext) {
'fleet:reassign_action:retry',
'fleet:request_diagnostics:retry',
'fleet:setup:upgrade_managed_package_policies',
'fleet:sync-integrations-task',
'fleet:unenroll-inactive-agents-task',
'fleet:unenroll_action:retry',
'fleet:update_agent_tags:retry',