Add install integrations endpoint (#184167)

Resolves #183472
Resolves #183020

## Summary

1. Decouples integration installation from fleet privilege
2. Creates API endpoint to install all selected integrations as a single
request:

  - Should accept list of selected integrations and log files
- Should respond in a format easy to parse using native bash / standard
unix commands like awk/sed/grep
- Should return unified config for Elastic Agent for all installed
integrations
- ~Should update saved object with selected integrations and
installation status~ Waiting for designs/requirements

## Screenshot

```text
curl --request POST \
    --url "598bc802-0616-47c2-8895-c9cc24b959dd/integrations/install" \
    --header "Authorization: ApiKey emRMWHBvOEJOMmJEaFRKNnN4LS06SVJwcldSTkxTUjZtU1VpNXRLU2ZBdw==" \
    --header "Content-Type: text/tab-separated-values" \
    --data $'system\tregistry\nproduct_service\tcustom\t/path/to/access.log\ncheckout_service\tcustom\t/path/to/access.log\ncheckout_service\tcustom\t/path/to/error.log'

outputs:
  default:
    type: elasticsearch
    hosts:
      - 'http://localhost:9200'
    api_key: 'zdLXpo8BN2bDhTJ6sx--:IRprWRNLSR6mSUi5tKSfAw'
inputs:
  - id: logfile-system.auth-96f640d3-2365-4008-b634-dcbe8278b583
    data_stream:
      dataset: system.auth
      type: logs
    paths:
      - /var/log/auth.log*
      - /var/log/secure*
    exclude_files:
      - .gz$
    multiline:
      pattern: ^s
      match: after
    tags:
      - system-auth
    processors:
      - add_locale: null
  - id: logfile-system.syslog-96f640d3-2365-4008-b634-dcbe8278b583
    data_stream:
      dataset: system.syslog
      type: logs
    paths:
      - /var/log/messages*
      - /var/log/syslog*
      - /var/log/system*
    exclude_files:
      - .gz$
    multiline:
      pattern: ^s
      match: after
    processors:
      - add_locale: null
  - id: custom-logs-4e07e609-ba8e-4dbe-9490-0b4aaf9e637b
    type: logfile
    data_stream:
      namespace: default
    streams:
      - id: logs-onboarding-product_service
        data_stream:
          dataset: product_service
        paths:
          - /path/to/access.log
  - id: custom-logs-c665eb58-effe-4530-be01-8b510f969140
    type: logfile
    data_stream:
      namespace: default
    streams:
      - id: logs-onboarding-checkout_service
        data_stream:
          dataset: checkout_service
        paths:
          - /path/to/access.log
          - /path/to/error.log
```

<img width="1228" alt="Screenshot 2024-05-23 at 20 05 59"
src="c2759491-b9ae-4b89-8f24-e196708d76f6">

## Testing

1. Start a Quickstart onboarding flow and copy the onboarding ID from
DEV tools network tab
2. Create a new API key
3. Run the following curl:

```bash
curl --request POST \
  --url "http://localhost:5601/internal/observability_onboarding/flow/${ONBOARDING_ID}/integrations/install" \
  --header "Authorization: ApiKey ${ENCODED_API_KEY}" \
  --header "Content-Type: text/tab-separated-values" \
  --data $'system\tregistry\nproduct_service\tcustom\t/path/to/access.log\ncheckout_service\tcustom\t/path/to/access.log'
```

4. Go to installed integrations page
5. You should see the the system integration and 2 custom integrations
installed.
This commit is contained in:
Thom Heymann 2024-05-29 18:04:24 +01:00 committed by GitHub
parent a5613515ee
commit 5715ee8347
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 302 additions and 9 deletions

View file

@ -14,7 +14,7 @@ export const routeValidationObject = {
// if any validation is defined. Not having validation currently
// means we don't get the payload. See
// https://github.com/elastic/kibana/issues/50179
body: schema.nullable(anyObject),
body: schema.nullable(schema.oneOf([anyObject, schema.string()])),
params: anyObject,
query: anyObject,
};

View file

@ -20,6 +20,7 @@ export type {
MessageSigningServiceInterface,
} from './services';
export { getRegistryUrl } from './services';
export { NamingCollisionError } from './services/epm/packages/custom_integrations/validation/check_naming_collision';
export type { FleetSetupContract, FleetSetupDeps, FleetStartContract } from './plugin';
export type {

View file

@ -11,6 +11,7 @@ const createClientMock = (): jest.Mocked<PackageClient> => ({
getInstallation: jest.fn(),
ensureInstalledPackage: jest.fn(),
installPackage: jest.fn(),
installCustomIntegration: jest.fn(),
fetchFindLatestPackage: jest.fn(),
readBundledPackage: jest.fn(),
getPackage: jest.fn(),

View file

@ -29,6 +29,7 @@ import type {
Installation,
RegistryPackage,
} from '../../types';
import type { FleetAuthzRouteConfig } from '../security/types';
import { checkSuperuser, doesNotHaveRequiredFleetAuthz, getAuthzFromRequest } from '../security';
import { FleetError, FleetUnauthorizedError, PackageNotFoundError } from '../../errors';
@ -36,6 +37,10 @@ import { INSTALL_PACKAGES_AUTHZ, READ_PACKAGE_INFO_AUTHZ } from '../../routes/ep
import type { InstallResult } from '../../../common';
import { appContextService } from '..';
import type { CustomPackageDatasetConfiguration } from './packages/install';
import type { FetchFindLatestPackageOptions } from './registry';
import * as Registry from './registry';
import { fetchFindLatestPackageOrThrow, getPackage } from './registry';
@ -60,14 +65,22 @@ export interface PackageClient {
pkgVersion?: string;
spaceId?: string;
force?: boolean;
}): Promise<Installation | undefined>;
}): Promise<Installation>;
installPackage(options: {
pkgName: string;
pkgVersion?: string;
spaceId?: string;
force?: boolean;
}): Promise<InstallResult | undefined>;
}): Promise<InstallResult>;
installCustomIntegration(options: {
pkgName: string;
kibanaVersion?: string;
force?: boolean;
spaceId?: string;
datasets: CustomPackageDatasetConfiguration[];
}): Promise<InstallResult>;
fetchFindLatestPackage(
packageName: string,
@ -167,7 +180,7 @@ class PackageClientImpl implements PackageClient {
pkgVersion?: string;
spaceId?: string;
force?: boolean;
}): Promise<Installation | undefined> {
}): Promise<Installation> {
await this.#runPreflight(INSTALL_PACKAGES_AUTHZ);
return ensureInstalledPackage({
@ -176,12 +189,13 @@ class PackageClientImpl implements PackageClient {
savedObjectsClient: this.internalSoClient,
});
}
public async installPackage(options: {
pkgName: string;
pkgVersion?: string;
spaceId?: string;
force?: boolean;
}): Promise<InstallResult | undefined> {
}): Promise<InstallResult> {
await this.#runPreflight(INSTALL_PACKAGES_AUTHZ);
const { pkgName, pkgVersion, spaceId = DEFAULT_SPACE_ID, force = false } = options;
@ -203,6 +217,37 @@ class PackageClientImpl implements PackageClient {
});
}
public async installCustomIntegration(options: {
pkgName: string;
kibanaVersion?: string;
force?: boolean | undefined;
spaceId?: string | undefined;
datasets: CustomPackageDatasetConfiguration[];
}): Promise<InstallResult> {
await this.#runPreflight(INSTALL_PACKAGES_AUTHZ);
const {
pkgName,
kibanaVersion = appContextService.getKibanaVersion(),
datasets,
spaceId = DEFAULT_SPACE_ID,
force = false,
} = options;
return await installPackage({
force,
pkgName,
kibanaVersion,
datasets,
spaceId,
installSource: 'custom',
esClient: this.internalEsClient,
savedObjectsClient: this.internalSoClient,
neverIgnoreVerificationError: !force,
authorizationHeader: this.getAuthorizationHeader(),
});
}
public async fetchFindLatestPackage(
packageName: string,
options?: FetchFindLatestPackageOptions

View file

@ -7,6 +7,13 @@
import Boom from '@hapi/boom';
import * as t from 'io-ts';
import {
NamingCollisionError,
FleetUnauthorizedError,
type PackageClient,
} from '@kbn/fleet-plugin/server';
import { v4 as uuidv4 } from 'uuid';
import { dump } from 'js-yaml';
import { getObservabilityOnboardingFlow, saveObservabilityOnboardingFlow } from '../../lib/state';
import {
ElasticAgentStepPayload,
@ -14,10 +21,13 @@ import {
} from '../../saved_objects/observability_onboarding_status';
import { createObservabilityOnboardingServerRoute } from '../create_observability_onboarding_server_route';
import { getHasLogs } from './get_has_logs';
import { getSystemLogsDataStreams } from '../../../common/elastic_agent_logs';
import { getFallbackESUrl } from '../../lib/get_fallback_urls';
const updateOnboardingFlowRoute = createObservabilityOnboardingServerRoute({
endpoint: 'PUT /internal/observability_onboarding/flow/{onboardingId}',
options: { tags: [] },
options: { tags: [], xsrfRequired: false },
params: t.type({
path: t.type({
onboardingId: t.string,
@ -52,7 +62,7 @@ const updateOnboardingFlowRoute = createObservabilityOnboardingServerRoute({
const stepProgressUpdateRoute = createObservabilityOnboardingServerRoute({
endpoint: 'POST /internal/observability_onboarding/flow/{id}/step/{name}',
options: { tags: [] },
options: { tags: [], xsrfRequired: false },
params: t.type({
path: t.type({
id: t.string,
@ -114,7 +124,7 @@ const stepProgressUpdateRoute = createObservabilityOnboardingServerRoute({
const getProgressRoute = createObservabilityOnboardingServerRoute({
endpoint: 'GET /internal/observability_onboarding/flow/{onboardingId}/progress',
options: { tags: [] },
options: { tags: [], xsrfRequired: false },
params: t.type({
path: t.type({
onboardingId: t.string,
@ -171,8 +181,230 @@ const getProgressRoute = createObservabilityOnboardingServerRoute({
},
});
/**
* This endpoints installs the requested integrations and returns the corresponding config file for Elastic Agent.
*
* The request/response format is TSV (tab-separated values) to simplify parsing in bash.
*
* Example request:
*
* ```text
* POST /internal/observability_onboarding/flow/${ONBOARDING_ID}/integrations/install
*
* system registry
* product_service custom /path/to/access.log
* product_service custom /path/to/error.log
* checkout_service custom /path/to/access.log
* checkout_service custom /path/to/error.log
* ```
*
* Example curl:
*
* ```bash
* curl --request POST \
* --url "http://localhost:5601/internal/observability_onboarding/flow/${ONBOARDING_ID}/integrations/install" \
* --header "Authorization: ApiKey ${ENCODED_API_KEY}" \
* --header "Content-Type: text/tab-separated-values" \
* --data $'system\tregistry\nproduct_service\tcustom\t/path/to/access.log\ncheckout_service\tcustom\t/path/to/access.log'
* ```
*/
const integrationsInstallRoute = createObservabilityOnboardingServerRoute({
endpoint: 'POST /internal/observability_onboarding/flow/{onboardingId}/integrations/install',
options: { tags: [], xsrfRequired: false },
params: t.type({
path: t.type({
onboardingId: t.string,
}),
body: t.string,
}),
async handler({ context, request, response, params, core, plugins, services }) {
const coreStart = await core.start();
const fleetStart = await plugins.fleet.start();
const savedObjectsClient = coreStart.savedObjects.createInternalRepository();
const packageClient = fleetStart.packageService.asScoped(request);
const savedObservabilityOnboardingState = await getObservabilityOnboardingFlow({
savedObjectsClient,
savedObjectId: params.path.onboardingId,
});
if (!savedObservabilityOnboardingState) {
throw Boom.notFound(`Onboarding session '${params.path.onboardingId}' not found.`);
}
const integrationsToInstall = parseIntegrationsTSV(params.body);
if (!integrationsToInstall.length) {
return response.badRequest({
body: {
message: 'Please specify a list of integrations to install',
},
});
}
await saveObservabilityOnboardingFlow({
savedObjectsClient,
savedObjectId: params.path.onboardingId,
observabilityOnboardingState: {
...savedObservabilityOnboardingState,
type: 'logFiles',
progress: {},
} as ObservabilityOnboardingFlow,
});
let agentInputs: unknown[];
try {
agentInputs = await ensureInstalledIntegrations(integrationsToInstall, packageClient);
} catch (error) {
if (error instanceof FleetUnauthorizedError) {
return response.forbidden({
body: {
message: error.message,
},
});
}
throw error;
}
const elasticsearchUrl = plugins.cloud?.setup?.elasticsearchUrl
? [plugins.cloud?.setup?.elasticsearchUrl]
: await getFallbackESUrl(services.esLegacyConfigService);
return response.ok({
headers: {
'content-type': 'application/yaml',
},
body: generateAgentConfig({
esHost: elasticsearchUrl,
inputs: agentInputs,
}),
});
},
});
type Integration =
| {
pkgName: string;
installSource: 'registry';
}
| {
pkgName: string;
installSource: 'custom';
logFilePaths: string[];
};
async function ensureInstalledIntegrations(
integrationsToInstall: Integration[],
packageClient: PackageClient
) {
const agentInputs: unknown[] = [];
for (const integration of integrationsToInstall) {
const { pkgName, installSource } = integration;
if (installSource === 'registry') {
await packageClient.ensureInstalledPackage({ pkgName });
agentInputs.push(...getSystemLogsDataStreams(uuidv4()));
} else if (installSource === 'custom') {
const input = {
id: `custom-logs-${uuidv4()}`,
type: 'logfile',
data_stream: {
namespace: 'default',
},
streams: [
{
id: `logs-onboarding-${pkgName}`,
data_stream: {
dataset: pkgName,
},
paths: integration.logFilePaths,
},
],
};
try {
await packageClient.installCustomIntegration({
pkgName,
datasets: [{ name: pkgName, type: 'logs' }],
});
agentInputs.push(input);
} catch (error) {
// If the error is a naming collision, we can assume the integration is already installed and treat this step as successful
if (error instanceof NamingCollisionError) {
agentInputs.push(input);
} else {
throw error;
}
}
}
}
return agentInputs;
}
/**
* Parses and validates a TSV (tab-separated values) string of integrations with params.
*
* Returns an object of integrations to install.
*
* Example input:
*
* ```text
* system registry
* product_service custom /path/to/access.log
* product_service custom /path/to/error.log
* checkout_service custom /path/to/access.log
* checkout_service custom /path/to/error.log
* ```
*/
function parseIntegrationsTSV(tsv: string) {
return Object.values(
tsv
.split('\n')
.map((line) => line.split('\t', 3))
.reduce<Record<string, Integration>>((acc, [pkgName, installSource, logFilePath]) => {
if (installSource === 'registry') {
if (logFilePath) {
throw new Error(`Integration '${pkgName}' does not support a file path`);
}
acc[pkgName] = {
pkgName,
installSource,
};
return acc;
} else if (installSource === 'custom') {
if (!logFilePath) {
throw new Error(`Missing file path for integration: ${pkgName}`);
}
// Append file path if integration is already in the list
const existing = acc[pkgName];
if (existing && existing.installSource === 'custom') {
existing.logFilePaths.push(logFilePath);
return acc;
}
acc[pkgName] = {
pkgName,
installSource,
logFilePaths: [logFilePath],
};
return acc;
}
throw new Error(`Invalid install source: ${installSource}`);
}, {})
);
}
const generateAgentConfig = ({ esHost, inputs = [] }: { esHost: string[]; inputs: unknown[] }) => {
return dump({
outputs: {
default: {
type: 'elasticsearch',
hosts: esHost,
api_key: '${API_KEY}', // Placeholder to be replaced by bash script with the actual API key
},
},
inputs,
});
};
export const flowRouteRepository = {
...updateOnboardingFlowRoute,
...stepProgressUpdateRoute,
...getProgressRoute,
...integrationsInstallRoute,
};

View file

@ -6,6 +6,7 @@
*/
import { errors } from '@elastic/elasticsearch';
import Boom from '@hapi/boom';
import type { IKibanaResponse } from '@kbn/core/server';
import { CoreSetup, Logger, RouteRegistrar } from '@kbn/core/server';
import {
ServerRouteRepository,
@ -68,6 +69,7 @@ export function registerRoutes({
const data = (await handler({
context,
request,
response,
logger,
params: decodedParams,
plugins,
@ -87,6 +89,10 @@ export function registerRoutes({
return response.noContent();
}
if (data instanceof response.noContent().constructor) {
return data as IKibanaResponse;
}
return response.ok({ body: data });
} catch (error) {
if (Boom.isBoom(error)) {

View file

@ -4,7 +4,13 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CoreSetup, CoreStart, KibanaRequest, Logger } from '@kbn/core/server';
import {
CoreSetup,
CoreStart,
KibanaRequest,
KibanaResponseFactory,
Logger,
} from '@kbn/core/server';
import { ObservabilityOnboardingServerRouteRepository } from '.';
import { ObservabilityOnboardingConfig } from '..';
import { EsLegacyConfigService } from '../services/es_legacy_config_service';
@ -20,6 +26,7 @@ export interface ObservabilityOnboardingRouteHandlerResources {
context: ObservabilityOnboardingRequestHandlerContext;
logger: Logger;
request: KibanaRequest;
response: KibanaResponseFactory;
plugins: {
[key in keyof ObservabilityOnboardingPluginSetupDependencies]: {
setup: Required<ObservabilityOnboardingPluginSetupDependencies>[key];
@ -40,5 +47,6 @@ export interface ObservabilityOnboardingRouteHandlerResources {
export interface ObservabilityOnboardingRouteCreateOptions {
options: {
tags: string[];
xsrfRequired?: boolean;
};
}