[Fleet] sync integrations from follower index (#212371)

## Summary

Relates https://github.com/elastic/kibana/issues/206242

Implemented installing integrations from the doc in the follower index.
Can be tested locally by creating the ccr index locally or setting up 2
local clusters and set up ccr between them.

To test:
- Requires https://github.com/elastic/elasticsearch/pull/123493,
checkout and run es from source
```
yarn es source --license trial -E xpack.security.authc.api_key.enabled=true -E xpack.security.authc.token.enabled=true  --source-path=/Users/juliabardi/elasticsearch  -E path.data=/tmp/es-data -E xpack.ml.enabled=false
```
- Enable feature flag `xpack.fleet.enableExperimental:
['enableSyncIntegrationsOnRemote']`
- Create doc in ccr index, `hosts` should match local elasticsearch host
```
POST fleet-synced-integrations-ccr-remote1/_doc
{
  "id": "fleet-synced-integrations",
 "remote_es_hosts": [
            {
              "hosts": [
                "http://192.168.64.1:9200"
              ],
              "name": "remote1",
              "sync_integrations": true
            }
          ],
          "integrations": [
            {
              "package_version": "1.25.0",
              "package_name": "nginx",
              "updated_at": "2025-02-24T09:03:51.936Z"
            }
          ]
}
```
- Wait 1m until the task runs, verify that the integrations from the doc
are installed


### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
This commit is contained in:
Julia Bardi 2025-03-03 14:21:25 +01:00 committed by GitHub
parent 883bc1794b
commit 3e2373fd08
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 727 additions and 22 deletions

View file

@ -84,6 +84,7 @@ export interface PackageClient {
pkgVersion?: string;
spaceId?: string;
force?: boolean;
keepFailedInstallation?: boolean;
}): Promise<InstallResult>;
installCustomIntegration(options: {
@ -225,10 +226,17 @@ class PackageClientImpl implements PackageClient {
pkgVersion?: string;
spaceId?: string;
force?: boolean;
keepFailedInstallation?: boolean;
}): Promise<InstallResult> {
await this.#runPreflight(INSTALL_PACKAGES_AUTHZ);
const { pkgName, pkgVersion, spaceId = DEFAULT_SPACE_ID, force = false } = options;
const {
pkgName,
pkgVersion,
spaceId = DEFAULT_SPACE_ID,
force = false,
keepFailedInstallation,
} = options;
// If pkgVersion isn't specified, find the latest package version
const pkgKeyProps = pkgVersion
@ -244,6 +252,7 @@ class PackageClientImpl implements PackageClient {
esClient: this.internalEsClient,
savedObjectsClient: this.internalSoClient,
neverIgnoreVerificationError: !force,
keepFailedInstallation,
});
}

View file

@ -261,6 +261,7 @@ export async function handleInstallPackageFailure({
esClient,
spaceId,
authorizationHeader,
keepFailedInstallation,
}: {
savedObjectsClient: SavedObjectsClientContract;
error: FleetError | Boom.Boom | Error;
@ -270,6 +271,7 @@ export async function handleInstallPackageFailure({
esClient: ElasticsearchClient;
spaceId: string;
authorizationHeader?: HTTPAuthorizationHeader | null;
keepFailedInstallation?: boolean;
}) {
if (error instanceof ConcurrentInstallOperationError) {
return;
@ -305,6 +307,9 @@ export async function handleInstallPackageFailure({
logger.error(
`Uninstalling ${pkgkey} after error installing: [${error.toString()}] with install type: ${installType}`
);
if (keepFailedInstallation) {
return;
}
await removeInstallation({ savedObjectsClient, pkgName, pkgVersion, esClient });
return;
}
@ -387,6 +392,7 @@ interface InstallRegistryPackageParams {
ignoreMappingUpdateErrors?: boolean;
skipDataStreamRollover?: boolean;
retryFromLastState?: boolean;
keepFailedInstallation?: boolean;
}
export interface CustomPackageDatasetConfiguration {
@ -450,6 +456,7 @@ async function installPackageFromRegistry({
ignoreMappingUpdateErrors = false,
skipDataStreamRollover = false,
retryFromLastState = false,
keepFailedInstallation = false,
}: InstallRegistryPackageParams): Promise<InstallResult> {
const logger = appContextService.getLogger();
// TODO: change epm API to /packageName/version so we don't need to do this
@ -551,6 +558,7 @@ async function installPackageFromRegistry({
skipDataStreamRollover,
retryFromLastState,
useStreaming,
keepFailedInstallation,
});
} catch (e) {
sendEvent({
@ -590,6 +598,7 @@ async function installPackageWithStateMachine(options: {
skipDataStreamRollover?: boolean;
retryFromLastState?: boolean;
useStreaming?: boolean;
keepFailedInstallation?: boolean;
}): Promise<InstallResult> {
const packageInfo = options.packageInstallContext.packageInfo;
@ -610,6 +619,7 @@ async function installPackageWithStateMachine(options: {
packageInstallContext,
retryFromLastState,
useStreaming,
keepFailedInstallation,
} = options;
let { telemetryEvent } = options;
const logger = appContextService.getLogger();
@ -735,6 +745,7 @@ async function installPackageWithStateMachine(options: {
spaceId,
esClient,
authorizationHeader,
keepFailedInstallation,
});
sendEvent({
...telemetryEvent!,
@ -896,6 +907,7 @@ export async function installPackage(args: InstallPackageParams): Promise<Instal
ignoreMappingUpdateErrors,
skipDataStreamRollover,
retryFromLastState,
keepFailedInstallation,
} = args;
const matchingBundledPackage = await getBundledPackageByPkgKey(pkgkey);
@ -938,6 +950,7 @@ export async function installPackage(args: InstallPackageParams): Promise<Instal
ignoreMappingUpdateErrors,
skipDataStreamRollover,
retryFromLastState,
keepFailedInstallation,
});
return response;

View file

@ -0,0 +1,407 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { PackageNotFoundError } from '../errors';
import { outputService } from '../services';
import { syncIntegrationsOnRemote } from './sync_integrations_on_remote';
jest.mock('../services');
const outputServiceMock = outputService as jest.Mocked<typeof outputService>;
describe('syncIntegrationsOnRemote', () => {
const abortController = new AbortController();
let esClientMock: any;
let getIndicesMock: jest.Mock;
let searchMock: jest.Mock;
let packageClientMock: any;
let loggerMock: any;
beforeEach(() => {
getIndicesMock = jest.fn();
searchMock = jest.fn();
esClientMock = {
indices: {
get: getIndicesMock,
},
search: searchMock,
};
outputServiceMock.list.mockResolvedValue({
items: [
{
type: 'elasticsearch',
hosts: ['http://localhost:9200'],
},
],
} as any);
packageClientMock = {
getInstallation: jest.fn(),
installPackage: jest.fn(),
};
loggerMock = {
debug: jest.fn(),
error: jest.fn(),
warn: jest.fn(),
info: jest.fn(),
};
});
it('should throw error if multiple synced integrations ccr indices exist', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
'fleet-synced-integrations-ccr-remote2': {},
});
await expect(
syncIntegrationsOnRemote(esClientMock, {} as any, {} as any, abortController, loggerMock)
).rejects.toThrowError(
'Not supported to sync multiple indices with prefix fleet-synced-integrations-ccr-*'
);
});
function getSyncedIntegrationsCCRDoc(syncEnabled: boolean) {
return {
hits: {
hits: [
{
_source: {
remote_es_hosts: [
{
hosts: ['http://localhost:9200'],
sync_integrations: syncEnabled,
},
],
integrations: [
{
package_name: 'nginx',
package_version: '2.2.0',
updated_at: '2021-01-01T00:00:00.000Z',
},
{
package_name: 'system',
package_version: '2.2.0',
updated_at: '2021-01-01T00:00:00.000Z',
},
],
},
},
],
},
};
}
it('should do nothing if no matching remote output has sync enabled', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
});
searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(false));
await syncIntegrationsOnRemote(
esClientMock,
{} as any,
packageClientMock,
abortController,
loggerMock
);
expect(packageClientMock.getInstallation).not.toHaveBeenCalled();
});
it('should do nothing if sync enabled and packages are installed', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
});
searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true));
packageClientMock.getInstallation.mockImplementation((packageName: string) =>
packageName === 'nginx'
? {
install_status: 'installed',
version: '2.2.0',
}
: {
install_status: 'installed',
version: '2.3.0',
}
);
await syncIntegrationsOnRemote(
esClientMock,
{} as any,
packageClientMock,
abortController,
loggerMock
);
expect(packageClientMock.installPackage).not.toHaveBeenCalled();
});
it('should install package if lower version is installed', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
});
searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true));
packageClientMock.getInstallation.mockImplementation((packageName: string) =>
packageName === 'nginx'
? {
install_status: 'installed',
version: '2.1.0',
}
: {
install_status: 'installed',
version: '2.2.0',
}
);
packageClientMock.installPackage.mockResolvedValue({
status: 'installed',
});
await syncIntegrationsOnRemote(
esClientMock,
{} as any,
packageClientMock,
abortController,
loggerMock
);
expect(packageClientMock.installPackage).toHaveBeenCalledWith({
pkgName: 'nginx',
pkgVersion: '2.2.0',
keepFailedInstallation: true,
force: true,
});
});
it('should keep installing all packages when one throws error', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
});
searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true));
packageClientMock.getInstallation.mockImplementation((packageName: string) =>
packageName === 'nginx'
? {
install_status: 'installed',
version: '2.1.0',
}
: {
install_status: 'installed',
version: '2.0.0',
}
);
packageClientMock.installPackage.mockImplementation(({ pkgName }: any) => {
if (pkgName === 'nginx') {
throw new Error('failed to install');
} else {
return {
status: 'installed',
};
}
});
await syncIntegrationsOnRemote(
esClientMock,
{} as any,
packageClientMock,
abortController,
loggerMock
);
expect(packageClientMock.installPackage).toHaveBeenCalledTimes(2);
});
it('should try to install latest package on PackageNotFoundError', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
});
searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true));
packageClientMock.getInstallation.mockImplementation((packageName: string) =>
packageName === 'nginx'
? undefined
: {
install_status: 'installed',
version: '2.2.0',
}
);
packageClientMock.installPackage.mockImplementation(({ pkgName, pkgVersion }: any) => {
if (pkgVersion === '2.2.0') {
return {
error: new PackageNotFoundError('not found'),
};
}
return {
status: 'installed',
};
});
await syncIntegrationsOnRemote(
esClientMock,
{} as any,
packageClientMock,
abortController,
loggerMock
);
expect(packageClientMock.installPackage).toHaveBeenCalledTimes(2);
});
it('should not retry if max retry attempts reached', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
});
searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true));
packageClientMock.getInstallation.mockImplementation((packageName: string) =>
packageName === 'nginx'
? {
install_status: 'install_failed',
version: '2.1.0',
latest_install_failed_attempts: [
{
created_at: new Date().toISOString(),
},
{
created_at: '2025-01-28T08:11:44.395Z',
},
{
created_at: '2025-01-27T08:11:44.395Z',
},
{
created_at: '2025-01-26T08:11:44.395Z',
},
{
created_at: '2025-01-25T08:11:44.395Z',
},
],
}
: {
install_status: 'installed',
version: '2.2.0',
}
);
packageClientMock.installPackage.mockResolvedValue({
status: 'installed',
});
await syncIntegrationsOnRemote(
esClientMock,
{} as any,
packageClientMock,
abortController,
loggerMock
);
expect(packageClientMock.installPackage).not.toHaveBeenCalled();
});
it('should not retry if retry time not passed', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
});
searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true));
packageClientMock.getInstallation.mockImplementation((packageName: string) =>
packageName === 'nginx'
? {
install_status: 'install_failed',
version: '2.1.0',
latest_install_failed_attempts: [
{
created_at: new Date().toISOString(),
},
{
created_at: '2025-01-28T08:11:44.395Z',
},
{
created_at: '2025-01-27T08:11:44.395Z',
},
{
created_at: '2025-01-26T08:11:44.395Z',
},
],
}
: {
install_status: 'installed',
version: '2.2.0',
}
);
packageClientMock.installPackage.mockResolvedValue({
status: 'installed',
});
await syncIntegrationsOnRemote(
esClientMock,
{} as any,
packageClientMock,
abortController,
loggerMock
);
expect(packageClientMock.installPackage).not.toHaveBeenCalled();
});
it('should retry if retry time passed', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
});
searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true));
packageClientMock.getInstallation.mockImplementation((packageName: string) =>
packageName === 'nginx'
? {
install_status: 'install_failed',
version: '2.1.0',
latest_install_failed_attempts: [
{
created_at: '2025-02-28T04:11:44.395Z',
},
],
}
: {
install_status: 'installed',
version: '2.2.0',
}
);
packageClientMock.installPackage.mockResolvedValue({
status: 'installed',
});
await syncIntegrationsOnRemote(
esClientMock,
{} as any,
packageClientMock,
abortController,
loggerMock
);
expect(packageClientMock.installPackage).toHaveBeenCalled();
});
it('should do nothing if sync enabled and the package is installing', async () => {
getIndicesMock.mockResolvedValue({
'fleet-synced-integrations-ccr-remote1': {},
});
searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true));
packageClientMock.getInstallation.mockImplementation((packageName: string) =>
packageName === 'nginx'
? {
install_status: 'installing',
version: '2.1.0',
}
: {
install_status: 'installed',
version: '2.3.0',
}
);
await syncIntegrationsOnRemote(
esClientMock,
{} as any,
packageClientMock,
abortController,
loggerMock
);
expect(packageClientMock.installPackage).not.toHaveBeenCalled();
});
});

View file

@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { ElasticsearchClient, SavedObjectsClient, Logger } from '@kbn/core/server';
import semverGte from 'semver/functions/gte';
import type { PackageClient } from '../services';
import { outputService } from '../services';
import { PackageNotFoundError } from '../errors';
import type { SyncIntegrationsData } from './sync_integrations_task';
const FLEET_SYNCED_INTEGRATIONS_CCR_INDEX_PREFIX = 'fleet-synced-integrations-ccr-*';
const MAX_RETRY_ATTEMPTS = 5;
const RETRY_BACKOFF_MINUTES = [5, 10, 20, 40, 60];
const getSyncedIntegrationsCCRDoc = async (
esClient: ElasticsearchClient,
abortController: AbortController
): Promise<SyncIntegrationsData | undefined> => {
const indices = await esClient.indices.get(
{
index: FLEET_SYNCED_INTEGRATIONS_CCR_INDEX_PREFIX,
},
{ signal: abortController.signal }
);
const indexNames = Object.keys(indices);
if (indexNames.length > 1) {
throw new Error(
`Not supported to sync multiple indices with prefix ${FLEET_SYNCED_INTEGRATIONS_CCR_INDEX_PREFIX}`
);
}
if (indexNames.length === 0) {
return undefined;
}
const response = await esClient.search(
{
index: indexNames[0],
},
{ signal: abortController.signal }
);
if (response.hits.hits.length === 0) {
return undefined;
}
return response.hits.hits[0]._source as SyncIntegrationsData;
};
async function getSyncIntegrationsEnabled(
soClient: SavedObjectsClient,
remoteEsHosts: SyncIntegrationsData['remote_es_hosts'] | undefined
): Promise<boolean> {
const outputs = await outputService.list(soClient);
const esHosts = outputs.items
.filter((output) => output.type === 'elasticsearch')
.flatMap((output) => output.hosts);
const isSyncIntegrationsEnabled = remoteEsHosts?.some((remoteEsHost) => {
return (
remoteEsHost.sync_integrations && remoteEsHost.hosts.some((host) => esHosts.includes(host))
);
});
return isSyncIntegrationsEnabled ?? false;
}
async function installPackageIfNotInstalled(
pkg: { package_name: string; package_version: string },
packageClient: PackageClient,
logger: Logger,
abortController: AbortController
) {
const installation = await packageClient.getInstallation(pkg.package_name);
if (
installation?.install_status === 'installed' &&
semverGte(installation.version, pkg.package_version)
) {
return;
}
if (installation?.install_status === 'installing') {
return;
}
if (installation?.install_status === 'install_failed') {
const attempt = installation.latest_install_failed_attempts?.length ?? 0;
if (attempt >= MAX_RETRY_ATTEMPTS) {
return;
}
const lastRetryAttemptTime = installation.latest_install_failed_attempts?.[0].created_at;
// retry install if backoff time has passed since the last attempt
const shouldRetryInstall =
attempt > 0 &&
lastRetryAttemptTime &&
Date.now() - Date.parse(lastRetryAttemptTime) >
RETRY_BACKOFF_MINUTES[attempt - 1] * 60 * 1000;
if (!shouldRetryInstall) {
return;
}
}
try {
const installResult = await packageClient.installPackage({
pkgName: pkg.package_name,
pkgVersion: pkg.package_version,
keepFailedInstallation: true,
// using force flag because the package version might not be the latest on this cluster
force: true,
});
if (installResult.status === 'installed') {
logger.info(`Package ${pkg.package_name} installed with version ${pkg.package_version}`);
}
if (installResult.error instanceof PackageNotFoundError) {
if (abortController.signal.aborted) {
throw new Error('Task was aborted');
}
logger.warn(
`Package ${pkg.package_name} with version ${pkg.package_version} not found, trying to install latest version`
);
const installLatestResult = await packageClient.installPackage({
pkgName: pkg.package_name,
keepFailedInstallation: true,
force: true,
});
if (installLatestResult.status === 'installed') {
logger.info(`Package ${pkg.package_name} installed with version ${pkg.package_version}`);
}
}
} catch (error) {
logger.error(
`Failed to install package ${pkg.package_name} with version ${pkg.package_version}, error: ${error}`
);
}
}
export const syncIntegrationsOnRemote = async (
esClient: ElasticsearchClient,
soClient: SavedObjectsClient,
packageClient: PackageClient,
abortController: AbortController,
logger: Logger
) => {
const syncIntegrationsDoc = await getSyncedIntegrationsCCRDoc(esClient, abortController);
const isSyncIntegrationsEnabled = await getSyncIntegrationsEnabled(
soClient,
syncIntegrationsDoc?.remote_es_hosts
);
if (!isSyncIntegrationsEnabled) {
return;
}
for (const pkg of syncIntegrationsDoc?.integrations ?? []) {
if (abortController.signal.aborted) {
throw new Error('Task was aborted');
}
await installPackageIfNotInstalled(pkg, packageClient, logger, abortController);
}
};

View file

@ -14,7 +14,7 @@ import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task';
import type { CoreSetup } from '@kbn/core/server';
import { loggingSystemMock } from '@kbn/core/server/mocks';
import { createAppContextStartContractMock } from '../mocks';
import { createAppContextStartContractMock, createMockPackageService } from '../mocks';
import { appContextService, outputService } from '../services';
@ -53,6 +53,8 @@ jest.mock('../services/epm/packages/get', () => ({
}),
}));
jest.mock('./sync_integrations_on_remote');
const MOCK_TASK_INSTANCE = {
id: `${TYPE}:${VERSION}`,
runAt: new Date(),
@ -79,7 +81,11 @@ describe('SyncIntegrationsTask', () => {
beforeEach(() => {
mockContract = createAppContextStartContractMock();
appContextService.start(mockContract);
mockCore = coreSetupMock();
mockCore = coreSetupMock({
pluginStartContract: {
packageService: createMockPackageService(),
},
});
mockTaskManagerSetup = tmSetupMock();
mockTask = new SyncIntegrationsTask({
core: mockCore,
@ -197,5 +203,68 @@ describe('SyncIntegrationsTask', () => {
expect(esClient.update).not.toHaveBeenCalled();
});
it('Should update fleet-synced-integrations doc if sync flag changed from true to false', async () => {
mockOutputService.list.mockResolvedValue({
items: [
{
type: 'remote_elasticsearch',
name: 'remote2',
hosts: ['https://remote2:9200'],
sync_integrations: false,
},
],
} as any);
esClient.get.mockResolvedValue({
_source: {
remote_es_hosts: [
{ hosts: ['https://remote1:9200'], name: 'remote1', sync_integrations: true },
],
},
} as any);
await runTask();
expect(esClient.update).toHaveBeenCalled();
});
it('Should not update fleet-synced-integrations doc if sync flag already false', async () => {
mockOutputService.list.mockResolvedValue({
items: [
{
type: 'remote_elasticsearch',
name: 'remote2',
hosts: ['https://remote2:9200'],
sync_integrations: false,
},
],
} as any);
esClient.get.mockResolvedValue({
_source: {
remote_es_hosts: [
{ hosts: ['https://remote1:9200'], name: 'remote1', sync_integrations: false },
],
},
} as any);
await runTask();
expect(esClient.update).not.toHaveBeenCalled();
});
it('Should not update fleet-synced-integrations doc if sync doc does not exist', async () => {
mockOutputService.list.mockResolvedValue({
items: [
{
type: 'remote_elasticsearch',
name: 'remote2',
hosts: ['https://remote2:9200'],
sync_integrations: false,
},
],
} as any);
esClient.get.mockRejectedValue({ statusCode: 404 });
await runTask();
expect(esClient.update).not.toHaveBeenCalled();
});
});
});

View file

@ -22,12 +22,14 @@ import { appContextService, outputService } from '../services';
import { getInstalledPackageSavedObjects } from '../services/epm/packages/get';
import { FLEET_SYNCED_INTEGRATIONS_INDEX_NAME } from '../services/setup/fleet_synced_integrations';
import { syncIntegrationsOnRemote } from './sync_integrations_on_remote';
export const TYPE = 'fleet:sync-integrations-task';
export const VERSION = '1.0.0';
export const VERSION = '1.0.1';
const TITLE = 'Fleet Sync Integrations Task';
const SCOPE = ['fleet'];
const INTERVAL = '5m';
const TIMEOUT = '1m';
const TIMEOUT = '5m';
interface SyncIntegrationsTaskSetupContract {
core: CoreSetup;
@ -39,7 +41,7 @@ interface SyncIntegrationsTaskStartContract {
taskManager: TaskManagerStartContract;
}
interface SyncIntegrationsData {
export interface SyncIntegrationsData {
remote_es_hosts: Array<{
name: string;
hosts: string[];
@ -127,7 +129,7 @@ export class SyncIntegrationsTask {
this.logger.info(`[runTask()] started`);
const [coreStart] = await core.getStartServices();
const [coreStart, _startDeps, { packageService }] = (await core.getStartServices()) as any;
const esClient = coreStart.elasticsearch.client.asInternalUser;
const soClient = new SavedObjectsClient(coreStart.savedObjects.createInternalRepository());
@ -137,23 +139,19 @@ export class SyncIntegrationsTask {
return;
}
const indexExists = await esClient.indices.exists(
{
index: FLEET_SYNCED_INTEGRATIONS_INDEX_NAME,
},
{ signal: this.abortController.signal }
);
if (!indexExists) {
this.logger.info(
`[SyncIntegrationsTask] index ${FLEET_SYNCED_INTEGRATIONS_INDEX_NAME} does not exist`
);
return;
}
try {
// write integrations on main cluster
await this.updateSyncedIntegrationsData(esClient, soClient);
// sync integrations on remote cluster
await syncIntegrationsOnRemote(
esClient,
soClient,
packageService.asInternalUser,
this.abortController,
this.logger
);
this.endRun('success');
} catch (err) {
if (err instanceof errors.RequestAbortedError) {
@ -166,10 +164,46 @@ export class SyncIntegrationsTask {
}
};
private syncedIntegrationsIndexExists = async (esClient: ElasticsearchClient) => {
return await esClient.indices.exists(
{
index: FLEET_SYNCED_INTEGRATIONS_INDEX_NAME,
},
{ signal: this.abortController.signal }
);
};
private hadAnyRemoteESSyncEnabled = async (esClient: ElasticsearchClient): Promise<boolean> => {
try {
const res = await esClient.get({
id: FLEET_SYNCED_INTEGRATIONS_INDEX_NAME,
index: FLEET_SYNCED_INTEGRATIONS_INDEX_NAME,
});
if (!(res._source as any)?.remote_es_hosts.some((host: any) => host.sync_integrations)) {
return false;
}
} catch (error) {
if (error.statusCode === 404) {
return false;
}
throw error;
}
return true;
};
private updateSyncedIntegrationsData = async (
esClient: ElasticsearchClient,
soClient: SavedObjectsClient
) => {
const indexExists = await this.syncedIntegrationsIndexExists(esClient);
if (!indexExists) {
this.logger.info(
`[SyncIntegrationsTask] index ${FLEET_SYNCED_INTEGRATIONS_INDEX_NAME} does not exist`
);
return;
}
const outputs = await outputService.list(soClient);
const remoteESOutputs = outputs.items.filter(
(output) => output.type === outputType.RemoteElasticsearch
@ -179,7 +213,10 @@ export class SyncIntegrationsTask {
);
if (!isSyncEnabled) {
return;
const hadAnyRemoteESSyncEnabled = await this.hadAnyRemoteESSyncEnabled(esClient);
if (!hadAnyRemoteESSyncEnabled) {
return;
}
}
const newDoc: SyncIntegrationsData = {