Introduce Kibana task to deploy agentless connectors for 9.0 (#203973)

## Closes https://github.com/elastic/search-team/issues/8508
## Closes https://github.com/elastic/search-team/issues/8465

## Summary

This PR adds a background task for search_connectors plugin. This task
checks connector records and agentless package policies and sees if new
connector was added/old was deleted, and then adds/deletes package
policies for these connectors.

Scenario 1: a new connector was added by a user/API call

User creates an Elastic-managed connector:


https://github.com/user-attachments/assets/38296e48-b281-4b2b-9750-ab0a47334b55

When the user is done, a package policy is created by this background
task:


https://github.com/user-attachments/assets/12dbc33f-32bf-472d-b854-64588fc1e5b1

Scenario 2: a connector was deleted by a user/API call

User deletes an Elastic-managed connector:


https://github.com/user-attachments/assets/5997897e-fb9d-4199-8045-abe163264976

### Checklist

Check the PR satisfies following conditions. 

Reviewers should verify this PR satisfies this list as well.

- [ ] Any text added follows [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)
- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] If a plugin configuration key changed, check if it needs to be
allowlisted in the cloud and added to the [docker
list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)
- [ ] This was checked for breaking HTTP API changes, and any breaking
changes have been approved by the breaking-change committee. The
`release_note:breaking` label should be applied in these situations.
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
- [x] The PR description includes the appropriate Release Notes section,
and the correct `release_note:*` label is applied per the
[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Jedr Blaszyk <jedrazb@gmail.com>
This commit is contained in:
Artem Shelkovnikov 2025-01-10 12:22:00 +01:00 committed by GitHub
parent abf00ee777
commit c88d519bff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 1448 additions and 50 deletions

View file

@ -228,8 +228,10 @@ export const createPackagePolicyServiceMock = (): jest.Mocked<PackagePolicyClien
*/
export const createMockAgentPolicyService = (): jest.Mocked<AgentPolicyServiceInterface> => {
return {
create: jest.fn().mockReturnValue(Promise.resolve()),
get: jest.fn().mockReturnValue(Promise.resolve()),
list: jest.fn().mockReturnValue(Promise.resolve()),
delete: jest.fn().mockReturnValue(Promise.resolve()),
getFullAgentPolicy: jest.fn().mockReturnValue(Promise.resolve()),
getByIds: jest.fn().mockReturnValue(Promise.resolve()),
turnOffAgentTamperProtections: jest.fn().mockReturnValue(Promise.resolve()),

View file

@ -816,16 +816,7 @@ export class FleetPlugin
core.elasticsearch.client.asInternalUser,
internalSoClient
),
agentPolicyService: {
get: agentPolicyService.get,
list: agentPolicyService.list,
getFullAgentPolicy: agentPolicyService.getFullAgentPolicy,
getByIds: agentPolicyService.getByIDs,
turnOffAgentTamperProtections:
agentPolicyService.turnOffAgentTamperProtections.bind(agentPolicyService),
fetchAllAgentPolicies: agentPolicyService.fetchAllAgentPolicies,
fetchAllAgentPolicyIds: agentPolicyService.fetchAllAgentPolicyIds,
},
agentPolicyService,
packagePolicyService,
registerExternalCallback: (type: ExternalCallback[0], callback: ExternalCallback[1]) => {
return appContextService.addExternalCallback(type, callback);

View file

@ -196,7 +196,7 @@ export const bulkGetAgentPoliciesHandler: FleetRequestHandler<
'full query parameter require agent policies read permissions'
);
}
let items = await agentPolicyService.getByIDs(soClient, ids, {
let items = await agentPolicyService.getByIds(soClient, ids, {
withPackagePolicies,
ignoreMissing,
});
@ -687,7 +687,7 @@ export const GetListAgentPolicyOutputsHandler: FleetRequestHandler<
body: { items: [] },
});
}
const agentPolicies = await agentPolicyService.getByIDs(soClient, ids, {
const agentPolicies = await agentPolicyService.getByIds(soClient, ids, {
withPackagePolicies: true,
});

View file

@ -22,7 +22,7 @@ import {
jest.mock('../../services', () => ({
agentPolicyService: {
get: jest.fn(),
getByIDs: jest.fn(),
getByIds: jest.fn(),
},
appContextService: {
getInternalUserSOClientWithoutSpaceExtension: jest.fn(),

View file

@ -42,7 +42,7 @@ jest.mock('../../services', () => ({
},
agentPolicyService: {
get: jest.fn(),
getByIDs: jest.fn(),
getByIds: jest.fn(),
},
}));

View file

@ -504,7 +504,7 @@ describe('Agent policy', () => {
});
});
describe('getByIDs', () => {
describe('getByIds', () => {
it('should call audit logger', async () => {
const soClient = savedObjectsClientMock.create();
@ -525,7 +525,7 @@ describe('Agent policy', () => {
],
});
await agentPolicyService.getByIDs(soClient, ['test-agent-policy-1', 'test-agent-policy-2']);
await agentPolicyService.getByIds(soClient, ['test-agent-policy-1', 'test-agent-policy-2']);
expect(mockedAuditLoggingService.writeCustomSoAuditLog).toHaveBeenNthCalledWith(1, {
action: 'get',

View file

@ -507,7 +507,7 @@ class AgentPolicyService {
return agentPolicy;
}
public async getByIDs(
public async getByIds(
soClient: SavedObjectsClientContract,
ids: Array<string | { id: string; spaceId?: string }>,
options: { fields?: string[]; withPackagePolicies?: boolean; ignoreMissing?: boolean } = {}
@ -1345,7 +1345,7 @@ class AgentPolicyService {
});
}
const policies = await agentPolicyService.getByIDs(soClient, agentPolicyIds);
const policies = await agentPolicyService.getByIds(soClient, agentPolicyIds);
const policiesMap = keyBy(policies, 'id');
const fullPolicies = await pMap(
agentPolicyIds,

View file

@ -14,7 +14,7 @@ import { getHostedPolicies, isHostedAgent } from './hosted_agent';
jest.mock('../agent_policy', () => {
return {
agentPolicyService: {
getByIDs: jest.fn().mockResolvedValue([
getByIds: jest.fn().mockResolvedValue([
{ id: 'hosted-policy', is_managed: true },
{ id: 'regular-policy', is_managed: false },
]),

View file

@ -20,7 +20,7 @@ export async function getHostedPolicies(
);
// get the agent policies for those ids
const agentPolicies = await agentPolicyService.getByIDs(soClient, Array.from(policyIdsToGet), {
const agentPolicies = await agentPolicyService.getByIds(soClient, Array.from(policyIdsToGet), {
fields: ['is_managed'],
ignoreMissing: true,
});

View file

@ -34,7 +34,7 @@ jest.mock('../agent_policy', () => {
return {
agentPolicyService: {
getInactivityTimeouts: jest.fn().mockResolvedValue([]),
getByIDs: jest.fn().mockResolvedValue([{ id: 'hosted-agent-policy', is_managed: true }]),
getByIds: jest.fn().mockResolvedValue([{ id: 'hosted-agent-policy', is_managed: true }]),
list: jest.fn().mockResolvedValue({ items: [] }),
},
};

View file

@ -204,7 +204,7 @@ describe('getFleetServerPolicies', () => {
page: 1,
perPage: mockPackagePolicies.length,
});
(mockedAgentPolicyService.getByIDs as jest.Mock).mockResolvedValueOnce(mockFleetServerPolicies);
(mockedAgentPolicyService.getByIds as jest.Mock).mockResolvedValueOnce(mockFleetServerPolicies);
const result = await getFleetServerPolicies(soClient);
expect(result).toEqual(mockFleetServerPolicies);
});

View file

@ -41,7 +41,7 @@ export const getFleetServerPolicies = async (
// Retrieve associated agent policies
const fleetServerAgentPolicies = fleetServerAgentPolicyIds.length
? await agentPolicyService.getByIDs(
? await agentPolicyService.getByIds(
soClient,
uniqBy(fleetServerAgentPolicyIds, (p) => p.id)
)

View file

@ -15,10 +15,12 @@ export { getRegistryUrl } from './epm/registry/registry_url';
*/
export interface AgentPolicyServiceInterface {
create: (typeof agentPolicyService)['create'];
get: (typeof agentPolicyService)['get'];
list: (typeof agentPolicyService)['list'];
delete: (typeof agentPolicyService)['delete'];
getFullAgentPolicy: (typeof agentPolicyService)['getFullAgentPolicy'];
getByIds: (typeof agentPolicyService)['getByIDs'];
getByIds: (typeof agentPolicyService)['getByIds'];
turnOffAgentTamperProtections: (typeof agentPolicyService)['turnOffAgentTamperProtections'];
fetchAllAgentPolicyIds: (typeof agentPolicyService)['fetchAllAgentPolicyIds'];
fetchAllAgentPolicies: (typeof agentPolicyService)['fetchAllAgentPolicies'];

View file

@ -316,7 +316,7 @@ describe('Output Service', () => {
} as unknown as ReturnType<typeof mockedPackagePolicyService.list>;
beforeEach(() => {
mockedAgentPolicyService.getByIDs.mockResolvedValue([]);
mockedAgentPolicyService.getByIds.mockResolvedValue([]);
mockedAgentPolicyService.list.mockClear();
mockedPackagePolicyService.list.mockReset();
mockedAgentPolicyService.hasAPMIntegration.mockClear();
@ -334,7 +334,7 @@ describe('Output Service', () => {
});
afterEach(() => {
mockedAgentPolicyService.getByIDs.mockClear();
mockedAgentPolicyService.getByIds.mockClear();
});
describe('create', () => {
@ -688,7 +688,7 @@ describe('Output Service', () => {
mockedPackagePolicyService.list.mockResolvedValue(
mockedPackagePolicyWithFleetServerResolvedValue
);
mockedAgentPolicyService.getByIDs.mockResolvedValue(
mockedAgentPolicyService.getByIds.mockResolvedValue(
(await mockedAgentPolicyWithFleetServerResolvedValue).items
);
@ -727,7 +727,7 @@ describe('Output Service', () => {
mockedPackagePolicyService.list.mockResolvedValue(
mockedPackagePolicyWithSyntheticsResolvedValue
);
mockedAgentPolicyService.getByIDs.mockResolvedValue(
mockedAgentPolicyService.getByIds.mockResolvedValue(
(await mockedAgentPolicyWithSyntheticsResolvedValue).items
);
@ -845,7 +845,7 @@ describe('Output Service', () => {
mockedPackagePolicyService.list.mockResolvedValue(
mockedPackagePolicyWithFleetServerResolvedValue
);
mockedAgentPolicyService.getByIDs.mockResolvedValue(
mockedAgentPolicyService.getByIds.mockResolvedValue(
(await mockedAgentPolicyWithFleetServerResolvedValue).items
);
@ -884,7 +884,7 @@ describe('Output Service', () => {
mockedPackagePolicyService.list.mockResolvedValue(
mockedPackagePolicyWithSyntheticsResolvedValue
);
mockedAgentPolicyService.getByIDs.mockResolvedValue(
mockedAgentPolicyService.getByIds.mockResolvedValue(
(await mockedAgentPolicyWithSyntheticsResolvedValue).items
);

View file

@ -176,7 +176,7 @@ async function getAgentPoliciesPerOutput(outputId?: string, isDefault?: boolean)
}, [])
),
];
const agentPoliciesFromPackagePolicies = await agentPolicyService.getByIDs(
const agentPoliciesFromPackagePolicies = await agentPolicyService.getByIds(
internalSoClientWithoutSpaceExtension,
agentPolicyIdsFromPackagePolicies
);
@ -245,7 +245,7 @@ async function findPoliciesWithFleetServerOrSynthetics(outputId?: string, isDefa
);
const agentPolicyIds = _.uniq(packagePolicies.flatMap((p) => p.policy_ids));
if (agentPolicyIds.length) {
agentPolicies = await agentPolicyService.getByIDs(
agentPolicies = await agentPolicyService.getByIds(
internalSoClientWithoutSpaceExtension,
agentPolicyIds
);

View file

@ -227,7 +227,7 @@ const mockAgentPolicyGet = (spaceIds: string[] = ['default']) => {
});
}
);
mockAgentPolicyService.getByIDs.mockImplementation(
mockAgentPolicyService.getByIds.mockImplementation(
// @ts-ignore
(_soClient: SavedObjectsClientContract, ids: string[]) => {
return Promise.resolve(

View file

@ -518,7 +518,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient {
const agentPolicyIds = new Set(packagePolicies.flatMap((pkgPolicy) => pkgPolicy.policy_ids));
const agentPolicies = await agentPolicyService.getByIDs(soClient, [...agentPolicyIds]);
const agentPolicies = await agentPolicyService.getByIds(soClient, [...agentPolicyIds]);
const agentPoliciesIndexById = indexBy('id', agentPolicies);
for (const agentPolicy of agentPolicies) {
validateIsNotHostedPolicy(agentPolicy, options?.force);
@ -1551,7 +1551,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient {
return acc;
}, new Set());
const agentPolicies = await agentPolicyService.getByIDs(soClient, uniquePolicyIdsR);
const agentPolicies = await agentPolicyService.getByIds(soClient, uniquePolicyIdsR);
for (const policyId of uniquePolicyIdsR) {
const agentPolicy = agentPolicies.find((p) => p.id === policyId);

View file

@ -217,7 +217,7 @@ describe('UninstallTokenService', () => {
agentPolicyService.deployPolicies = jest.fn();
getAgentPoliciesByIDsMock = jest.fn().mockResolvedValue([]);
agentPolicyService.getByIDs = getAgentPoliciesByIDsMock;
agentPolicyService.getByIds = getAgentPoliciesByIDsMock;
if (scoppedInSpace) {
soClientMock.getCurrentNamespace.mockReturnValue(scoppedInSpace);

View file

@ -305,7 +305,7 @@ export class UninstallTokenService implements UninstallTokenServiceInterface {
}
private async getPolicyIdNameDictionary(policyIds: string[]): Promise<Record<string, string>> {
const agentPolicies = await agentPolicyService.getByIDs(this.soClient, policyIds, {
const agentPolicies = await agentPolicyService.getByIds(this.soClient, policyIds, {
ignoreMissing: true,
});
@ -615,7 +615,7 @@ export class UninstallTokenService implements UninstallTokenServiceInterface {
const batchSize = config?.setup?.agentPolicySchemaUpgradeBatchSize ?? 100;
await asyncForEach(chunk(policyIds, batchSize), async (policyIdsBatch) => {
const policies = await agentPolicyService.getByIDs(
const policies = await agentPolicyService.getByIds(
appContextService.getInternalUserSOClientWithoutSpaceExtension(),
policyIds.map((id) => ({ id, spaceId: '*' }))
);

View file

@ -17,8 +17,11 @@
"search",
"connectors"
],
"requiredPlugins": [],
"optionalPlugins": [],
"requiredBundles": []
"requiredPlugins": [
"licensing",
"taskManager",
"fleet"
],
"optionalPlugins": []
}
}

View file

@ -5,40 +5,94 @@
* 2.0.
*/
import type { PluginInitializerContext, Plugin, CoreSetup } from '@kbn/core/server';
import type {
PluginInitializerContext,
Plugin,
CoreStart,
CoreSetup,
Logger,
} from '@kbn/core/server';
import { ConnectorServerSideDefinition } from '@kbn/search-connectors';
import { isAgentlessEnabled } from '@kbn/fleet-plugin/server/services/utils/agentless';
import { getConnectorTypes } from '../common/lib/connector_types';
import type {
SearchConnectorsPluginSetup as SearchConnectorsPluginSetup,
SearchConnectorsPluginStart as SearchConnectorsPluginStart,
SetupDependencies,
StartDependencies,
SearchConnectorsPluginSetupDependencies,
SearchConnectorsPluginStartDependencies,
} from './types';
import { AgentlessConnectorDeploymentsSyncService } from './task';
import { SearchConnectorsConfig } from './config';
export class SearchConnectorsPlugin
implements
Plugin<
SearchConnectorsPluginSetup,
SearchConnectorsPluginStart,
SetupDependencies,
StartDependencies
SearchConnectorsPluginSetupDependencies,
SearchConnectorsPluginStartDependencies
>
{
private connectors: ConnectorServerSideDefinition[];
private log: Logger;
private readonly config: SearchConnectorsConfig;
private agentlessConnectorDeploymentsSyncService: AgentlessConnectorDeploymentsSyncService;
constructor(initializerContext: PluginInitializerContext) {
this.connectors = [];
this.log = initializerContext.logger.get();
this.config = initializerContext.config.get();
this.agentlessConnectorDeploymentsSyncService = new AgentlessConnectorDeploymentsSyncService(
this.log
);
}
public setup({ getStartServices, http }: CoreSetup<StartDependencies>) {
public setup(
coreSetup: CoreSetup<SearchConnectorsPluginStartDependencies, SearchConnectorsPluginStart>,
plugins: SearchConnectorsPluginSetupDependencies
) {
const http = coreSetup.http;
this.connectors = getConnectorTypes(http.staticAssets);
const coreStartServices = coreSetup.getStartServices();
// There seems to be no way to check for agentless here
// So we register a task, but do not execute it in `start` method
this.log.debug('Registering agentless connectors infra sync task');
coreStartServices
.then(([coreStart, searchConnectorsPluginStartDependencies]) => {
this.agentlessConnectorDeploymentsSyncService.registerInfraSyncTask(
plugins,
coreStart,
searchConnectorsPluginStartDependencies
);
})
.catch((err) => {
this.log.error(`Error registering agentless connectors infra sync task`, err);
});
return {
getConnectorTypes: () => this.connectors,
};
}
public start() {
public start(coreStart: CoreStart, plugins: SearchConnectorsPluginStartDependencies) {
if (isAgentlessEnabled()) {
this.log.info(
'Agentless is supported, scheduling initial agentless connectors infrastructure watcher task'
);
this.agentlessConnectorDeploymentsSyncService
.scheduleInfraSyncTask(this.config, plugins.taskManager)
.catch((err) => {
this.log.error(`Error scheduling agentless connectors infra sync task`, err);
});
} else {
this.log.info(
'Agentless is not supported, skipping scheduling initial agentless connectors infrastructure watcher task'
);
}
return {
getConnectors: () => this.connectors,
};

View file

@ -0,0 +1,631 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server';
import {
ElasticsearchClientMock,
elasticsearchClientMock,
} from '@kbn/core-elasticsearch-client-server-mocks';
import {
AgentlessConnectorsInfraService,
ConnectorMetadata,
PackagePolicyMetadata,
getConnectorsWithoutPolicies,
getPoliciesWithoutConnectors,
} from '.';
import { savedObjectsClientMock } from '@kbn/core/server/mocks';
import { MockedLogger, loggerMock } from '@kbn/logging-mocks';
import {
createPackagePolicyServiceMock,
createMockAgentPolicyService,
} from '@kbn/fleet-plugin/server/mocks';
import { AgentPolicyServiceInterface, PackagePolicyClient } from '@kbn/fleet-plugin/server';
import { AgentPolicy, PackagePolicy, PackagePolicyInput } from '@kbn/fleet-plugin/common';
import { createAgentPolicyMock, createPackagePolicyMock } from '@kbn/fleet-plugin/common/mocks';
jest.mock('@kbn/fleet-plugin/server/services/epm/packages', () => {
const mockedGetPackageInfo = ({ pkgName }: { pkgName: string }) => {
if (pkgName === 'elastic_connectors') {
const pkg = {
version: '0.0.5',
policy_templates: [
{
name: 'github_elastic_connectors',
inputs: [
{
type: 'connectors-py',
vars: [
{
name: 'connector_id',
required: false,
type: 'string',
},
{
name: 'connector_name',
required: false,
type: 'string',
},
{
name: 'service_type',
required: false,
type: 'string',
},
],
},
],
},
],
};
return Promise.resolve(pkg);
}
};
return {
getPackageInfo: jest.fn().mockImplementation(mockedGetPackageInfo),
};
});
describe('AgentlessConnectorsInfraService', () => {
let soClient: jest.Mocked<SavedObjectsClientContract>;
let esClient: ElasticsearchClientMock;
let packagePolicyService: jest.Mocked<PackagePolicyClient>;
let agentPolicyInterface: jest.Mocked<AgentPolicyServiceInterface>;
let logger: MockedLogger;
let service: AgentlessConnectorsInfraService;
beforeEach(async () => {
soClient = savedObjectsClientMock.create();
esClient = elasticsearchClientMock.createClusterClient().asInternalUser;
packagePolicyService = createPackagePolicyServiceMock();
agentPolicyInterface = createMockAgentPolicyService();
logger = loggerMock.create();
service = new AgentlessConnectorsInfraService(
soClient,
esClient,
packagePolicyService,
agentPolicyInterface,
logger
);
jest.clearAllMocks();
});
describe('getNativeConnectors', () => {
test('Lists only native connectors', async () => {
const mockResult = {
results: [
{
id: '00000001',
name: 'Sharepoint Online Production Connector',
service_type: 'sharepoint_online',
is_native: false,
},
{
id: '00000002',
name: 'Github Connector for ACME Organisation',
service_type: 'github',
is_native: true,
},
],
count: 2,
};
esClient.transport.request.mockResolvedValue(mockResult);
const nativeConnectors = await service.getNativeConnectors();
expect(nativeConnectors.length).toBe(1);
expect(nativeConnectors[0].id).toBe(mockResult.results[1].id);
expect(nativeConnectors[0].name).toBe(mockResult.results[1].name);
expect(nativeConnectors[0].service_type).toBe(mockResult.results[1].service_type);
});
test('Lists only supported service types', async () => {
const mockResult = {
results: [
{
id: '00000001',
name: 'Sharepoint Online Production Connector',
service_type: 'sharepoint_online',
is_native: true,
},
{
id: '00000002',
name: 'Github Connector for ACME Organisation',
service_type: 'github',
is_native: true,
},
{
id: '00000003',
name: 'Connector with unexpected service_type',
service_type: 'crawler',
is_native: true,
},
{
id: '00000004',
name: 'Connector with no service_type',
service_type: null,
is_native: true,
},
],
count: 4,
};
esClient.transport.request.mockResolvedValue(mockResult);
const nativeConnectors = await service.getNativeConnectors();
expect(nativeConnectors.length).toBe(2);
expect(nativeConnectors[0].id).toBe(mockResult.results[0].id);
expect(nativeConnectors[0].name).toBe(mockResult.results[0].name);
expect(nativeConnectors[0].service_type).toBe(mockResult.results[0].service_type);
expect(nativeConnectors[1].id).toBe(mockResult.results[1].id);
expect(nativeConnectors[1].name).toBe(mockResult.results[1].name);
expect(nativeConnectors[1].service_type).toBe(mockResult.results[1].service_type);
});
});
describe('getConnectorPackagePolicies', () => {
const getMockPolicyFetchAllItems = (pages: PackagePolicy[][]) => {
return {
async *[Symbol.asyncIterator]() {
for (const page of pages) {
yield page;
}
},
} as AsyncIterable<PackagePolicy[]>;
};
test('Lists only policies with expected input', async () => {
const firstPackagePolicy = createPackagePolicyMock();
firstPackagePolicy.id = 'this-is-package-policy-id';
firstPackagePolicy.policy_ids = ['this-is-agent-policy-id'];
firstPackagePolicy.supports_agentless = true;
firstPackagePolicy.inputs = [
{
type: 'connectors-py',
compiled_input: {
connector_id: '00000001',
connector_name: 'Sharepoint Online Production Connector',
service_type: 'sharepoint_online',
},
} as PackagePolicyInput,
];
const secondPackagePolicy = createPackagePolicyMock();
secondPackagePolicy.supports_agentless = true;
const thirdPackagePolicy = createPackagePolicyMock();
thirdPackagePolicy.supports_agentless = true;
thirdPackagePolicy.inputs = [
{
type: 'something-unsupported',
compiled_input: {
connector_id: '00000001',
connector_name: 'Sharepoint Online Production Connector',
service_type: 'sharepoint_online',
},
} as PackagePolicyInput,
];
packagePolicyService.fetchAllItems.mockResolvedValue(
getMockPolicyFetchAllItems([[firstPackagePolicy, secondPackagePolicy, thirdPackagePolicy]])
);
const policies = await service.getConnectorPackagePolicies();
expect(policies.length).toBe(1);
expect(policies[0].package_policy_id).toBe(firstPackagePolicy.id);
expect(policies[0].connector_metadata.id).toBe(
firstPackagePolicy.inputs[0].compiled_input.connector_id
);
expect(policies[0].connector_metadata.name).toBe(
firstPackagePolicy.inputs[0].compiled_input.connector_name
);
expect(policies[0].connector_metadata.service_type).toBe(
firstPackagePolicy.inputs[0].compiled_input.service_type
);
expect(policies[0].agent_policy_ids).toBe(firstPackagePolicy.policy_ids);
});
test('Lists policies if they are returned over multiple pages', async () => {
const firstPackagePolicy = createPackagePolicyMock();
firstPackagePolicy.id = 'this-is-package-policy-id';
firstPackagePolicy.policy_ids = ['this-is-agent-policy-id'];
firstPackagePolicy.supports_agentless = true;
firstPackagePolicy.inputs = [
{
type: 'connectors-py',
compiled_input: {
connector_id: '00000001',
connector_name: 'Sharepoint Online Production Connector',
service_type: 'sharepoint_online',
},
} as PackagePolicyInput,
];
const secondPackagePolicy = createPackagePolicyMock();
secondPackagePolicy.supports_agentless = true;
const thirdPackagePolicy = createPackagePolicyMock();
thirdPackagePolicy.supports_agentless = true;
thirdPackagePolicy.inputs = [
{
type: 'connectors-py',
compiled_input: {
connector_id: '00000003',
connector_name: 'Sharepoint Online Production Connector',
service_type: 'github',
},
} as PackagePolicyInput,
];
packagePolicyService.fetchAllItems.mockResolvedValue(
getMockPolicyFetchAllItems([
[firstPackagePolicy],
[secondPackagePolicy],
[thirdPackagePolicy],
])
);
const policies = await service.getConnectorPackagePolicies();
expect(policies.length).toBe(2);
expect(policies[0].package_policy_id).toBe(firstPackagePolicy.id);
expect(policies[0].connector_metadata.id).toBe(
firstPackagePolicy.inputs[0].compiled_input.connector_id
);
expect(policies[0].connector_metadata.name).toBe(
firstPackagePolicy.inputs[0].compiled_input.connector_name
);
expect(policies[0].connector_metadata.service_type).toBe(
firstPackagePolicy.inputs[0].compiled_input.service_type
);
expect(policies[0].agent_policy_ids).toBe(firstPackagePolicy.policy_ids);
expect(policies[1].package_policy_id).toBe(thirdPackagePolicy.id);
expect(policies[1].connector_metadata.id).toBe(
thirdPackagePolicy.inputs[0].compiled_input.connector_id
);
expect(policies[1].connector_metadata.name).toBe(
thirdPackagePolicy.inputs[0].compiled_input.connector_name
);
expect(policies[1].connector_metadata.service_type).toBe(
thirdPackagePolicy.inputs[0].compiled_input.service_type
);
expect(policies[1].agent_policy_ids).toBe(thirdPackagePolicy.policy_ids);
});
test('Skips policies that have missing fields', async () => {
const firstPackagePolicy = createPackagePolicyMock();
firstPackagePolicy.id = 'this-is-package-policy-id';
firstPackagePolicy.policy_ids = ['this-is-agent-policy-id'];
firstPackagePolicy.inputs = [
{
type: 'connectors-py',
compiled_input: {
connector_id: '00000001',
connector_name: 'Sharepoint Online Production Connector',
},
} as PackagePolicyInput,
];
const secondPackagePolicy = createPackagePolicyMock();
secondPackagePolicy.inputs = [
{
type: 'connectors-py',
compiled_input: {
connector_name: 'Sharepoint Online Production Connector',
service_type: 'github',
},
} as PackagePolicyInput,
];
packagePolicyService.fetchAllItems.mockResolvedValue(
getMockPolicyFetchAllItems([[firstPackagePolicy], [secondPackagePolicy]])
);
const policies = await service.getConnectorPackagePolicies();
expect(policies.length).toBe(0);
});
});
describe('deployConnector', () => {
let agentPolicy: AgentPolicy;
let sharepointOnlinePackagePolicy: PackagePolicy;
beforeAll(() => {
agentPolicy = createAgentPolicyMock();
sharepointOnlinePackagePolicy = createPackagePolicyMock();
sharepointOnlinePackagePolicy.id = 'this-is-package-policy-id';
sharepointOnlinePackagePolicy.policy_ids = ['this-is-agent-policy-id'];
sharepointOnlinePackagePolicy.inputs = [
{
type: 'connectors-py',
compiled_input: {
connector_id: '00000001',
connector_name: 'Sharepoint Online Production Connector',
service_type: 'sharepoint_online',
},
} as PackagePolicyInput,
];
});
test('Raises an error if connector.id is missing', async () => {
const connector = {
id: '',
name: 'something',
service_type: 'github',
};
try {
await service.deployConnector(connector);
expect(true).toBe(false);
} catch (e) {
expect(e.message).toContain('Connector id');
}
});
test('Raises an error if connector.service_type is missing', async () => {
const connector = {
id: '000000001',
name: 'something',
service_type: '',
};
try {
await service.deployConnector(connector);
expect(true).toBe(false);
} catch (e) {
expect(e.message).toContain('service_type');
}
});
test('Raises an error if connector.service_type is unsupported', async () => {
const connector = {
id: '000000001',
name: 'something',
service_type: 'crawler',
};
try {
await service.deployConnector(connector);
expect(true).toBe(false);
} catch (e) {
expect(e.message).toContain('service_type');
expect(e.message).toContain('incompatible');
}
});
test('Does not swallow an error if agent policy creation failed', async () => {
const connector = {
id: '000000001',
name: 'something',
service_type: 'github',
};
const errorMessage = 'Failed to create an agent policy hehe';
agentPolicyInterface.create.mockImplementation(() => {
throw new Error(errorMessage);
});
try {
await service.deployConnector(connector);
expect(true).toBe(false);
} catch (e) {
expect(e.message).toEqual(errorMessage);
}
});
test('Does not swallow an error if package policy creation failed', async () => {
const connector = {
id: '000000001',
name: 'something',
service_type: 'github',
};
const errorMessage = 'Failed to create a package policy hehe';
agentPolicyInterface.create.mockResolvedValue(agentPolicy);
packagePolicyService.create.mockImplementation(() => {
throw new Error(errorMessage);
});
try {
await service.deployConnector(connector);
expect(true).toBe(false);
} catch (e) {
expect(e.message).toEqual(errorMessage);
}
});
test('Returns a created package policy when all goes well', async () => {
const connector = {
id: '000000001',
name: 'something',
service_type: 'github',
};
agentPolicyInterface.create.mockResolvedValue(agentPolicy);
packagePolicyService.create.mockResolvedValue(sharepointOnlinePackagePolicy);
const result = await service.deployConnector(connector);
expect(result).toBe(sharepointOnlinePackagePolicy);
});
});
describe('removeDeployment', () => {
const packagePolicyId = 'this-is-package-policy-id';
const agentPolicyId = 'this-is-agent-policy-id';
let sharepointOnlinePackagePolicy: PackagePolicy;
beforeAll(() => {
sharepointOnlinePackagePolicy = createPackagePolicyMock();
sharepointOnlinePackagePolicy.id = packagePolicyId;
sharepointOnlinePackagePolicy.policy_ids = [agentPolicyId];
sharepointOnlinePackagePolicy.inputs = [
{
type: 'connectors-py',
compiled_input: {
connector_id: '00000001',
connector_name: 'Sharepoint Online Production Connector',
service_type: 'sharepoint_online',
},
} as PackagePolicyInput,
];
});
test('Calls for deletion of both agent policy and package policy', async () => {
packagePolicyService.get.mockResolvedValue(sharepointOnlinePackagePolicy);
await service.removeDeployment(packagePolicyId);
expect(agentPolicyInterface.delete).toBeCalledWith(soClient, esClient, agentPolicyId);
expect(packagePolicyService.delete).toBeCalledWith(soClient, esClient, [packagePolicyId]);
});
test('Raises an error if deletion of agent policy failed', async () => {
packagePolicyService.get.mockResolvedValue(sharepointOnlinePackagePolicy);
const errorMessage = 'Failed to create a package policy hehe';
agentPolicyInterface.delete.mockImplementation(() => {
throw new Error(errorMessage);
});
try {
await service.removeDeployment(packagePolicyId);
expect(true).toBe(false);
} catch (e) {
expect(e.message).toEqual(errorMessage);
}
});
test('Raises an error if deletion of package policy failed', async () => {
packagePolicyService.get.mockResolvedValue(sharepointOnlinePackagePolicy);
const errorMessage = 'Failed to create a package policy hehe';
packagePolicyService.delete.mockImplementation(() => {
throw new Error(errorMessage);
});
try {
await service.removeDeployment(packagePolicyId);
expect(true).toBe(false);
} catch (e) {
expect(e.message).toEqual(errorMessage);
}
});
test('Raises an error if a policy is not found', async () => {
packagePolicyService.get.mockResolvedValue(null);
try {
await service.removeDeployment(packagePolicyId);
expect(true).toBe(false);
} catch (e) {
expect(e.message).toContain('Failed to delete policy');
expect(e.message).toContain(packagePolicyId);
}
});
});
});
describe('module', () => {
const githubConnector: ConnectorMetadata = {
id: '000001',
name: 'Github Connector',
service_type: 'github',
};
const sharepointConnector: ConnectorMetadata = {
id: '000002',
name: 'Sharepoint Connector',
service_type: 'sharepoint_online',
};
const mysqlConnector: ConnectorMetadata = {
id: '000003',
name: 'MySQL Connector',
service_type: 'mysql',
};
const githubPackagePolicy: PackagePolicyMetadata = {
package_policy_id: 'agent-001',
agent_policy_ids: ['agent-package-001'],
connector_metadata: githubConnector,
};
const sharepointPackagePolicy: PackagePolicyMetadata = {
package_policy_id: 'agent-002',
agent_policy_ids: ['agent-package-002'],
connector_metadata: sharepointConnector,
};
const mysqlPackagePolicy: PackagePolicyMetadata = {
package_policy_id: 'agent-003',
agent_policy_ids: ['agent-package-003'],
connector_metadata: mysqlConnector,
};
describe('getPoliciesWithoutConnectors', () => {
test('Returns a missing policy if one is missing', async () => {
const missingPolicies = getPoliciesWithoutConnectors(
[githubPackagePolicy, sharepointPackagePolicy, mysqlPackagePolicy],
[githubConnector, sharepointConnector]
);
expect(missingPolicies.length).toBe(1);
expect(missingPolicies).toContain(mysqlPackagePolicy);
});
test('Returns empty array if no policies are missing', async () => {
const missingPolicies = getPoliciesWithoutConnectors(
[githubPackagePolicy, sharepointPackagePolicy, mysqlPackagePolicy],
[githubConnector, sharepointConnector, mysqlConnector]
);
expect(missingPolicies.length).toBe(0);
});
test('Returns all policies if all are missing', async () => {
const missingPolicies = getPoliciesWithoutConnectors(
[githubPackagePolicy, sharepointPackagePolicy, mysqlPackagePolicy],
[]
);
expect(missingPolicies.length).toBe(3);
expect(missingPolicies).toContain(githubPackagePolicy);
expect(missingPolicies).toContain(sharepointPackagePolicy);
expect(missingPolicies).toContain(mysqlPackagePolicy);
});
});
describe('getConnectorsWithoutPolicies', () => {
test('Returns a missing policy if one is missing', async () => {
const missingConnectors = getConnectorsWithoutPolicies(
[githubPackagePolicy, sharepointPackagePolicy],
[githubConnector, sharepointConnector, mysqlConnector]
);
expect(missingConnectors.length).toBe(1);
expect(missingConnectors).toContain(mysqlConnector);
});
test('Returns empty array if no policies are missing', async () => {
const missingConnectors = getConnectorsWithoutPolicies(
[githubPackagePolicy, sharepointPackagePolicy, mysqlPackagePolicy],
[githubConnector, sharepointConnector, mysqlConnector]
);
expect(missingConnectors.length).toBe(0);
});
test('Returns all policies if all are missing', async () => {
const missingConnectors = getConnectorsWithoutPolicies(
[],
[githubConnector, sharepointConnector, mysqlConnector]
);
expect(missingConnectors.length).toBe(3);
expect(missingConnectors).toContain(githubConnector);
expect(missingConnectors).toContain(sharepointConnector);
expect(missingConnectors).toContain(mysqlConnector);
});
});
});

View file

@ -0,0 +1,253 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { PACKAGE_POLICY_SAVED_OBJECT_TYPE, PackagePolicy } from '@kbn/fleet-plugin/common';
import { AgentPolicyServiceInterface, PackagePolicyClient } from '@kbn/fleet-plugin/server';
import type { Logger, SavedObjectsClientContract } from '@kbn/core/server';
import { NATIVE_CONNECTOR_DEFINITIONS, fetchConnectors } from '@kbn/search-connectors';
import { getPackageInfo } from '@kbn/fleet-plugin/server/services/epm/packages';
export interface ConnectorMetadata {
id: string;
name: string;
service_type: string;
}
export interface PackagePolicyMetadata {
package_policy_id: string;
agent_policy_ids: string[];
connector_metadata: ConnectorMetadata;
}
const connectorsInputName = 'connectors-py';
const pkgName = 'elastic_connectors';
const pkgTitle = 'Elastic Connectors';
export class AgentlessConnectorsInfraService {
private logger: Logger;
private soClient: SavedObjectsClientContract;
private esClient: ElasticsearchClient;
private packagePolicyService: PackagePolicyClient;
private agentPolicyService: AgentPolicyServiceInterface;
constructor(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
packagePolicyService: PackagePolicyClient,
agentPolicyService: AgentPolicyServiceInterface,
logger: Logger
) {
this.logger = logger;
this.soClient = soClient;
this.esClient = esClient;
this.packagePolicyService = packagePolicyService;
this.agentPolicyService = agentPolicyService;
}
public getNativeConnectors = async (): Promise<ConnectorMetadata[]> => {
this.logger.debug(`Fetching all connectors and filtering only to native`);
const nativeConnectors: ConnectorMetadata[] = [];
const allConnectors = await fetchConnectors(this.esClient);
for (const connector of allConnectors) {
if (connector.is_native && connector.service_type != null) {
if (NATIVE_CONNECTOR_DEFINITIONS[connector.service_type] == null) {
this.logger.debug(
`Skipping connector ${connector.id}: unsupported service type ${connector.service_type}`
);
continue;
}
nativeConnectors.push({
id: connector.id,
name: connector.name,
service_type: connector.service_type,
});
}
}
return nativeConnectors;
};
public getConnectorPackagePolicies = async (): Promise<PackagePolicyMetadata[]> => {
const kuery = `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name:${pkgName}`;
this.logger.debug(`Fetching policies with kuery: "${kuery}"`);
const policiesIterator = await this.packagePolicyService.fetchAllItems(this.soClient, {
perPage: 50,
kuery,
});
const policiesMetadata: PackagePolicyMetadata[] = [];
for await (const policyPage of policiesIterator) {
for (const policy of policyPage) {
for (const input of policy.inputs) {
if (input.type === connectorsInputName) {
if (input.compiled_input != null) {
if (input.compiled_input.service_type == null) {
this.logger.debug(`Policy ${policy.id} is missing service_type, skipping`);
continue;
}
if (input.compiled_input.connector_id == null) {
this.logger.debug(`Policy ${policy.id} is missing connector_id, skipping`);
continue;
}
if (input.compiled_input.connector_name == null) {
this.logger.debug(`Policy ${policy.id} is missing connector_name`);
// No need to skip, that's fine
}
// TODO: We manage all policies here, not only agentless.
// Return this code back once this logic is ironed out
// if (policy.supports_agentless !== true) {
// this.logger.debug(`Policy ${policy.id} does not support agentless, skipping`);
// continue;
// }
policiesMetadata.push({
package_policy_id: policy.id,
agent_policy_ids: policy.policy_ids,
connector_metadata: {
id: input.compiled_input.connector_id,
name: input.compiled_input.connector_name || '',
service_type: input.compiled_input.service_type,
},
});
}
}
}
}
}
return policiesMetadata;
};
public deployConnector = async (connector: ConnectorMetadata): Promise<PackagePolicy> => {
this.logger.info(
`Connector ${connector.id} has no integration policy associated with it, creating`
);
if (connector.id == null || connector.id.trim().length === 0) {
throw new Error(`Connector id is null or empty`);
}
if (connector.service_type == null || connector.service_type.trim().length === 0) {
throw new Error(`Connector ${connector.id} service_type is null or empty`);
}
if (NATIVE_CONNECTOR_DEFINITIONS[connector.service_type] == null) {
throw new Error(
`Connector ${connector.id} service_type is incompatible with agentless or unsupported`
);
}
this.logger.debug(`Getting package version for connectors package ${pkgName}`);
const pkgVersion = await this.getPackageVersion();
this.logger.debug(`Latest package version for ${pkgName} is ${pkgVersion}`);
const createdPolicy = await this.agentPolicyService.create(this.soClient, this.esClient, {
name: `${connector.service_type} connector: ${connector.id}`,
description: `Automatically generated on ${new Date(Date.now()).toISOString()}`,
namespace: 'default',
monitoring_enabled: ['logs', 'metrics'],
inactivity_timeout: 1209600,
is_protected: false,
supports_agentless: true,
});
this.logger.info(
`Successfully created agent policy ${createdPolicy.id} for agentless connector ${connector.id}`
);
this.logger.debug(`Creating a package policy for agentless connector ${connector.id}`);
const packagePolicy = await this.packagePolicyService.create(this.soClient, this.esClient, {
policy_ids: [createdPolicy.id],
package: {
title: pkgTitle,
name: pkgName,
version: pkgVersion,
},
name: `${connector.service_type} connector ${connector.id}`,
description: '',
namespace: '',
enabled: true,
inputs: [
{
type: connectorsInputName,
enabled: true,
vars: {
connector_id: { type: 'string', value: connector.id },
connector_name: { type: 'string', value: connector.name },
service_type: { type: 'string', value: connector.service_type },
},
streams: [],
},
],
});
this.logger.info(
`Successfully created package policy ${packagePolicy.id} for agentless connector ${connector.id}`
);
return packagePolicy;
};
public removeDeployment = async (packagePolicyId: string): Promise<void> => {
this.logger.info(`Deleting package policy ${packagePolicyId}`);
const policy = await this.packagePolicyService.get(this.soClient, packagePolicyId);
if (policy == null) {
throw new Error(`Failed to delete policy ${packagePolicyId}: not found`);
}
await this.packagePolicyService.delete(this.soClient, this.esClient, [policy.id]);
this.logger.debug(`Deleting package policies with ids ${policy.policy_ids}`);
// TODO: can we do it in one go?
// Why not use deleteFleetServerPoliciesForPolicyId?
for (const agentPolicyId of policy.policy_ids) {
this.logger.info(`Deleting agent policy ${agentPolicyId}`);
await this.agentPolicyService.delete(this.soClient, this.esClient, agentPolicyId);
}
};
private getPackageVersion = async (): Promise<string> => {
this.logger.debug(`Fetching ${pkgName} version`);
// This will raise an error if package is not there.
// Situation is exceptional, so we can just show
// the error message from getPackageInfo in this case
const packageInfo = await getPackageInfo({
savedObjectsClient: this.soClient,
pkgName,
pkgVersion: '',
skipArchive: true,
ignoreUnverified: true,
prerelease: true,
});
this.logger.debug(`Found ${pkgName} version: ${packageInfo.version}`);
return packageInfo.version;
};
}
export const getConnectorsWithoutPolicies = (
packagePolicies: PackagePolicyMetadata[],
connectors: ConnectorMetadata[]
): ConnectorMetadata[] => {
return connectors.filter(
(x) => packagePolicies.filter((y) => y.connector_metadata.id === x.id).length === 0
);
};
export const getPoliciesWithoutConnectors = (
packagePolicies: PackagePolicyMetadata[],
connectors: ConnectorMetadata[]
): PackagePolicyMetadata[] => {
return packagePolicies.filter(
(x) => connectors.filter((y) => y.id === x.connector_metadata.id).length === 0
);
};

View file

@ -0,0 +1,241 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { loggerMock, MockedLogger } from '@kbn/logging-mocks';
import { infraSyncTaskRunner } from './task';
import { ConcreteTaskInstance, TaskStatus } from '@kbn/task-manager-plugin/server';
import {
AgentlessConnectorsInfraService,
ConnectorMetadata,
PackagePolicyMetadata,
} from './services';
import { licensingMock } from '@kbn/licensing-plugin/server/mocks';
import { LicensingPluginStart } from '@kbn/licensing-plugin/server';
import { createPackagePolicyMock } from '@kbn/fleet-plugin/common/mocks';
const DATE_1970 = '1970-01-01T00:00:00.000Z';
describe('infraSyncTaskRunner', () => {
const githubConnector: ConnectorMetadata = {
id: '000001',
name: 'Github Connector',
service_type: 'github',
};
const sharepointConnector: ConnectorMetadata = {
id: '000002',
name: 'Sharepoint Connector',
service_type: 'sharepoint_online',
};
const mysqlConnector: ConnectorMetadata = {
id: '000003',
name: 'MySQL Connector',
service_type: 'mysql',
};
const githubPackagePolicy: PackagePolicyMetadata = {
package_policy_id: 'agent-001',
agent_policy_ids: ['agent-package-001'],
connector_metadata: githubConnector,
};
const sharepointPackagePolicy: PackagePolicyMetadata = {
package_policy_id: 'agent-002',
agent_policy_ids: ['agent-package-002'],
connector_metadata: sharepointConnector,
};
const mysqlPackagePolicy: PackagePolicyMetadata = {
package_policy_id: 'agent-003',
agent_policy_ids: ['agent-package-003'],
connector_metadata: mysqlConnector,
};
let logger: MockedLogger;
let serviceMock: jest.Mocked<AgentlessConnectorsInfraService>;
let licensePluginStartMock: jest.Mocked<LicensingPluginStart>;
const taskInstanceStub: ConcreteTaskInstance = {
id: '',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(DATE_1970),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {},
taskType: 'backfill',
timeoutOverride: '3m',
params: {
adHocRunParamsId: 'abc',
spaceId: 'default',
},
ownerId: null,
};
const invalidLicenseMock = licensingMock.createLicenseMock();
invalidLicenseMock.check.mockReturnValue({ state: 'invalid' });
const validLicenseMock = licensingMock.createLicenseMock();
validLicenseMock.check.mockReturnValue({ state: 'valid' });
beforeAll(async () => {
logger = loggerMock.create();
serviceMock = {
getNativeConnectors: jest.fn(),
getConnectorPackagePolicies: jest.fn(),
deployConnector: jest.fn(),
removeDeployment: jest.fn(),
} as unknown as jest.Mocked<AgentlessConnectorsInfraService>;
licensePluginStartMock = {
getLicense: jest.fn(),
} as unknown as jest.Mocked<LicensingPluginStart>;
});
beforeEach(() => {
jest.clearAllMocks();
});
test('Does nothing if no connectors or policies are configured', async () => {
await infraSyncTaskRunner(
logger,
serviceMock,
licensePluginStartMock
)({ taskInstance: taskInstanceStub }).run();
expect(serviceMock.deployConnector).not.toBeCalled();
expect(serviceMock.removeDeployment).not.toBeCalled();
});
test('Does nothing if connectors or policies requires deployment but license is not supported', async () => {
serviceMock.getNativeConnectors.mockResolvedValue([mysqlConnector, githubConnector]);
serviceMock.getConnectorPackagePolicies.mockResolvedValue([sharepointPackagePolicy]);
licensePluginStartMock.getLicense.mockResolvedValue(invalidLicenseMock);
await infraSyncTaskRunner(
logger,
serviceMock,
licensePluginStartMock
)({ taskInstance: taskInstanceStub }).run();
expect(serviceMock.deployConnector).not.toBeCalled();
expect(serviceMock.removeDeployment).not.toBeCalled();
expect(logger.warn).toBeCalledWith(expect.stringMatching(/.*not compatible.*/));
expect(logger.warn).toBeCalledWith(expect.stringMatching(/.*license.*/));
});
test('Does nothing if all connectors and package policies are in-sync', async () => {
serviceMock.getNativeConnectors.mockResolvedValue([
mysqlConnector,
githubConnector,
sharepointConnector,
]);
serviceMock.getConnectorPackagePolicies.mockResolvedValue([
mysqlPackagePolicy,
githubPackagePolicy,
sharepointPackagePolicy,
]);
licensePluginStartMock.getLicense.mockResolvedValue(validLicenseMock);
await infraSyncTaskRunner(
logger,
serviceMock,
licensePluginStartMock
)({ taskInstance: taskInstanceStub }).run();
expect(serviceMock.deployConnector).not.toBeCalled();
expect(serviceMock.removeDeployment).not.toBeCalled();
expect(logger.warn).not.toBeCalled();
});
test('Deploys connectors if no policies has been created for these connectors', async () => {
serviceMock.getNativeConnectors.mockResolvedValue([mysqlConnector, githubConnector]);
serviceMock.getConnectorPackagePolicies.mockResolvedValue([sharepointPackagePolicy]);
licensePluginStartMock.getLicense.mockResolvedValue(validLicenseMock);
await infraSyncTaskRunner(
logger,
serviceMock,
licensePluginStartMock
)({ taskInstance: taskInstanceStub }).run();
expect(serviceMock.deployConnector).toBeCalledWith(mysqlConnector);
expect(serviceMock.deployConnector).toBeCalledWith(githubConnector);
});
test('Deploys connectors even if another connectors failed to be deployed', async () => {
serviceMock.getNativeConnectors.mockResolvedValue([
mysqlConnector,
githubConnector,
sharepointConnector,
]);
serviceMock.getConnectorPackagePolicies.mockResolvedValue([]);
licensePluginStartMock.getLicense.mockResolvedValue(validLicenseMock);
serviceMock.deployConnector.mockImplementation(async (connector) => {
if (connector === mysqlConnector || connector === githubConnector) {
throw new Error('Cannot deploy these connectors');
}
return createPackagePolicyMock();
});
await infraSyncTaskRunner(
logger,
serviceMock,
licensePluginStartMock
)({ taskInstance: taskInstanceStub }).run();
expect(serviceMock.deployConnector).toBeCalledWith(mysqlConnector);
expect(serviceMock.deployConnector).toBeCalledWith(githubConnector);
expect(serviceMock.deployConnector).toBeCalledWith(sharepointConnector);
});
test('Removes a package policy if no connectors match the policy', async () => {
serviceMock.getNativeConnectors.mockResolvedValue([mysqlConnector, githubConnector]);
serviceMock.getConnectorPackagePolicies.mockResolvedValue([sharepointPackagePolicy]);
licensePluginStartMock.getLicense.mockResolvedValue(validLicenseMock);
await infraSyncTaskRunner(
logger,
serviceMock,
licensePluginStartMock
)({ taskInstance: taskInstanceStub }).run();
expect(serviceMock.removeDeployment).toBeCalledWith(sharepointPackagePolicy.package_policy_id);
});
test('Removes deployments even if another connectors failed to be undeployed', async () => {
serviceMock.getNativeConnectors.mockResolvedValue([]);
serviceMock.getConnectorPackagePolicies.mockResolvedValue([
sharepointPackagePolicy,
mysqlPackagePolicy,
githubPackagePolicy,
]);
licensePluginStartMock.getLicense.mockResolvedValue(validLicenseMock);
serviceMock.removeDeployment.mockImplementation(async (policyId) => {
if (
policyId === sharepointPackagePolicy.package_policy_id ||
policyId === mysqlPackagePolicy.package_policy_id
) {
throw new Error('Cannot deploy these connectors');
}
});
await infraSyncTaskRunner(
logger,
serviceMock,
licensePluginStartMock
)({ taskInstance: taskInstanceStub }).run();
expect(serviceMock.removeDeployment).toBeCalledWith(sharepointPackagePolicy.package_policy_id);
expect(serviceMock.removeDeployment).toBeCalledWith(mysqlPackagePolicy.package_policy_id);
expect(serviceMock.removeDeployment).toBeCalledWith(githubPackagePolicy.package_policy_id);
});
});

View file

@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger, CoreStart, SavedObjectsClient } from '@kbn/core/server';
import type {
ConcreteTaskInstance,
TaskManagerStartContract,
TaskInstance,
} from '@kbn/task-manager-plugin/server';
import { LicensingPluginStart } from '@kbn/licensing-plugin/server';
import type {
SearchConnectorsPluginStartDependencies,
SearchConnectorsPluginSetupDependencies,
} from './types';
import {
AgentlessConnectorsInfraService,
getConnectorsWithoutPolicies,
getPoliciesWithoutConnectors,
} from './services';
import { SearchConnectorsConfig } from './config';
const AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_ID = 'search:agentless-connectors-manager-task';
const AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_TYPE = 'search:agentless-connectors-manager';
const SCHEDULE = { interval: '1m' };
export function infraSyncTaskRunner(
logger: Logger,
service: AgentlessConnectorsInfraService,
licensingPluginStart: LicensingPluginStart
) {
return ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
return {
run: async () => {
try {
// We fetch some info even if license does not permit actual operations.
// This is done so that we could give a warning to the user only
// if they are actually using the feature.
logger.debug('Checking state of connectors and agentless policies');
// Fetch connectors
const nativeConnectors = await service.getNativeConnectors();
const policiesMetadata = await service.getConnectorPackagePolicies();
// Check license if any native connectors or agentless policies found
if (nativeConnectors.length > 0 || policiesMetadata.length > 0) {
const license = await licensingPluginStart.getLicense();
if (license.check('fleet', 'platinum').state !== 'valid') {
logger.warn(
'Current license is not compatible with agentless connectors. Please upgrade the license to at least platinum'
);
return;
}
}
// Deploy Policies
const connectorsWithoutPolicies = getConnectorsWithoutPolicies(
policiesMetadata,
nativeConnectors
);
let agentlessConnectorsDeployed = 0;
for (const connectorMetadata of connectorsWithoutPolicies) {
// We try-catch to still be able to deploy other connectors if some fail
try {
await service.deployConnector(connectorMetadata);
agentlessConnectorsDeployed += 1;
} catch (e) {
logger.warn(
`Error creating an agentless deployment for connector ${connectorMetadata.id}: ${e.message}`
);
}
}
// Delete policies
const policiesWithoutConnectors = getPoliciesWithoutConnectors(
policiesMetadata,
nativeConnectors
);
let agentlessConnectorsRemoved = 0;
for (const policyMetadata of policiesWithoutConnectors) {
// We try-catch to still be able to deploy other connectors if some fail
try {
await service.removeDeployment(policyMetadata.package_policy_id);
agentlessConnectorsRemoved += 1;
} catch (e) {
logger.warn(
`Error when deleting a package policy ${policyMetadata.package_policy_id}: ${e.message}`
);
}
}
return {
state: {},
schedule: SCHEDULE,
};
} catch (e) {
logger.warn(`Error executing agentless deployment sync task: ${e.message}`);
return {
state: {},
schedule: SCHEDULE,
};
}
},
cancel: async () => {
logger.warn('timed out');
},
};
};
}
export class AgentlessConnectorDeploymentsSyncService {
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger;
}
public registerInfraSyncTask(
plugins: SearchConnectorsPluginSetupDependencies,
coreStart: CoreStart,
searchConnectorsPluginStartDependencies: SearchConnectorsPluginStartDependencies
) {
const taskManager = plugins.taskManager;
const esClient = coreStart.elasticsearch.client.asInternalUser;
const savedObjects = coreStart.savedObjects;
const agentPolicyService = searchConnectorsPluginStartDependencies.fleet.agentPolicyService;
const packagePolicyService = searchConnectorsPluginStartDependencies.fleet.packagePolicyService;
const soClient = new SavedObjectsClient(savedObjects.createInternalRepository());
const service = new AgentlessConnectorsInfraService(
soClient,
esClient,
packagePolicyService,
agentPolicyService,
this.logger
);
taskManager.registerTaskDefinitions({
[AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_TYPE]: {
title: 'Agentless Connector Deployment Manager',
description:
'This task peridocally checks native connectors, agent policies and syncs them if they are out of sync',
timeout: '1m',
maxAttempts: 3,
createTaskRunner: infraSyncTaskRunner(
this.logger,
service,
searchConnectorsPluginStartDependencies.licensing
),
},
});
}
public async scheduleInfraSyncTask(
config: SearchConnectorsConfig,
taskManager: TaskManagerStartContract
): Promise<TaskInstance | null> {
this.logger.info(`Scheduling ${AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_ID}`);
try {
await taskManager.removeIfExists(AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_ID);
const taskInstance = await taskManager.ensureScheduled({
id: AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_ID,
taskType: AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_TYPE,
schedule: SCHEDULE,
params: {},
state: {},
scope: ['search', 'connectors'],
});
this.logger.info(
`Scheduled ${AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_ID} with interval ${taskInstance.schedule?.interval}`
);
return taskInstance;
} catch (e) {
this.logger.error(
`Error scheduling ${AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_ID}, received ${e.message}`
);
return null;
}
}
}

View file

@ -6,6 +6,13 @@
*/
import { ConnectorServerSideDefinition } from '@kbn/search-connectors';
import type { FleetStartContract, FleetSetupContract } from '@kbn/fleet-plugin/server';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import { SavedObjectsServiceSetup, SavedObjectsServiceStart } from '@kbn/core-saved-objects-server';
import { LicensingPluginStart } from '@kbn/licensing-plugin/server';
/* eslint-disable @typescript-eslint/no-empty-interface */
@ -15,5 +22,14 @@ export interface SearchConnectorsPluginSetup {
export interface SearchConnectorsPluginStart {}
export interface StartDependencies {}
export interface SetupDependencies {}
export interface SearchConnectorsPluginStartDependencies {
fleet: FleetStartContract;
taskManager: TaskManagerStartContract;
soClient: SavedObjectsServiceStart;
licensing: LicensingPluginStart;
}
export interface SearchConnectorsPluginSetupDependencies {
fleet: FleetSetupContract;
taskManager: TaskManagerSetupContract;
soClient: SavedObjectsServiceSetup;
}

View file

@ -19,5 +19,13 @@
"@kbn/config-schema",
"@kbn/core-http-browser",
"@kbn/search-connectors",
"@kbn/task-manager-plugin",
"@kbn/fleet-plugin",
"@kbn/core-saved-objects-api-server",
"@kbn/core-elasticsearch-client-server-mocks",
"@kbn/logging-mocks",
"@kbn/core-elasticsearch-server",
"@kbn/licensing-plugin",
"@kbn/core-saved-objects-server",
]
}

View file

@ -161,6 +161,7 @@ export default function ({ getService }: FtrProviderContext) {
'osquery:telemetry-saved-queries',
'report:execute',
'risk_engine:risk_scoring',
'search:agentless-connectors-manager',
'security-solution-ea-asset-criticality-ecs-migration',
'security:endpoint-diagnostics',
'security:endpoint-meta-telemetry',