[8.x] [Fleet] Add option to have Kafka dynamic topics in outputs (#192720) (#193197)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Fleet] Add option to have Kafka dynamic topics in outputs
(#192720)](https://github.com/elastic/kibana/pull/192720)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Cristina
Amico","email":"criamico@users.noreply.github.com"},"sourceCommit":{"committedDate":"2024-09-17T16:13:33Z","message":"[Fleet]
Add option to have Kafka dynamic topics in outputs (#192720)\n\n##
Summary\r\n\r\nAdd option to have Kafka dynamic topics in outputs
settings. It adds\r\nselection with radio buttons under the `topic`
section.\r\n\r\n### UI changes\r\n\r\n![Screenshot 2024-09-13 at 16
32\r\n54](https://github.com/user-attachments/assets/9414f475-c115-4986-a204-eeb79471b7ea)\r\n![Screenshot
2024-09-13 at 16
33\r\n03](https://github.com/user-attachments/assets/ac36b1da-0e81-4f97-9632-ced8b5776af2)\r\n![Screenshot
2024-09-13 at 16
33\r\n12](https://github.com/user-attachments/assets/5ea3cf66-d929-45fb-aa3b-924734c34305)\r\n![Screenshot
2024-09-13 at 16
33\r\n40](https://github.com/user-attachments/assets/62e477c8-5719-41fe-94b1-4bdda0f00007)\r\n\r\n<details>\r\n
<summary>Testing</summary>\r\n\r\n- Create a kafka output, fill the
required fields, select \"static\" topic\r\nand fill it out with
\"default_topic\". Payload will be:\r\n![Screenshot 2024-09-13 at 16
36\r\n50](https://github.com/user-attachments/assets/ea3761b8-39cb-40ce-bf1c-85e3af1f9d4b)\r\n-
The output should be created correctly\r\n\r\n- Create a kafka output,
fill the required fields, select \"dynamic\"\r\ntopic and choose one
from the dropdown. Payload will be:\r\n![Screenshot 2024-09-13 at 16
36\r\n23](https://github.com/user-attachments/assets/8d0d3ce4-5fb0-4c01-a939-b6a9d50da219)\r\n\r\n-
Create a kafka output, fill the required fields, select
\"dynamic\"\r\ntopic and input one not present in the dropdown (I had
\"custom_topic\".\r\nPayload will be like this:\r\n![Screenshot
2024-09-13 at 16
35\r\n52](https://github.com/user-attachments/assets/27347d01-3efc-4c23-85a8-f3287854de4f)\r\n</details>\r\n\r\n-
Try updating an existing Kafka output from one type to another,
it\r\nshould work correctly\r\n\r\n### Checklist\r\n\r\n- [ ] Any text
added follows [EUI's
writing\r\nguidelines](https://elastic.github.io/eui/#/guidelines/writing),
uses\r\nsentence case text and includes
[i18n\r\nsupport](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)\r\n-
[
]\r\n[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)\r\nwas
added for features that require explanation or tutorials\r\n- [ ] [Unit
or
functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere
updated or added to match the most common
scenarios\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"c0aaada30cf06bfcda3bdea8b67638e2f268b206","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Team:Fleet","v9.0.0","release_note:feature","v8.16.0"],"title":"[Fleet]
Add option to have Kafka dynamic topics in
outputs","number":192720,"url":"https://github.com/elastic/kibana/pull/192720","mergeCommit":{"message":"[Fleet]
Add option to have Kafka dynamic topics in outputs (#192720)\n\n##
Summary\r\n\r\nAdd option to have Kafka dynamic topics in outputs
settings. It adds\r\nselection with radio buttons under the `topic`
section.\r\n\r\n### UI changes\r\n\r\n![Screenshot 2024-09-13 at 16
32\r\n54](https://github.com/user-attachments/assets/9414f475-c115-4986-a204-eeb79471b7ea)\r\n![Screenshot
2024-09-13 at 16
33\r\n03](https://github.com/user-attachments/assets/ac36b1da-0e81-4f97-9632-ced8b5776af2)\r\n![Screenshot
2024-09-13 at 16
33\r\n12](https://github.com/user-attachments/assets/5ea3cf66-d929-45fb-aa3b-924734c34305)\r\n![Screenshot
2024-09-13 at 16
33\r\n40](https://github.com/user-attachments/assets/62e477c8-5719-41fe-94b1-4bdda0f00007)\r\n\r\n<details>\r\n
<summary>Testing</summary>\r\n\r\n- Create a kafka output, fill the
required fields, select \"static\" topic\r\nand fill it out with
\"default_topic\". Payload will be:\r\n![Screenshot 2024-09-13 at 16
36\r\n50](https://github.com/user-attachments/assets/ea3761b8-39cb-40ce-bf1c-85e3af1f9d4b)\r\n-
The output should be created correctly\r\n\r\n- Create a kafka output,
fill the required fields, select \"dynamic\"\r\ntopic and choose one
from the dropdown. Payload will be:\r\n![Screenshot 2024-09-13 at 16
36\r\n23](https://github.com/user-attachments/assets/8d0d3ce4-5fb0-4c01-a939-b6a9d50da219)\r\n\r\n-
Create a kafka output, fill the required fields, select
\"dynamic\"\r\ntopic and input one not present in the dropdown (I had
\"custom_topic\".\r\nPayload will be like this:\r\n![Screenshot
2024-09-13 at 16
35\r\n52](https://github.com/user-attachments/assets/27347d01-3efc-4c23-85a8-f3287854de4f)\r\n</details>\r\n\r\n-
Try updating an existing Kafka output from one type to another,
it\r\nshould work correctly\r\n\r\n### Checklist\r\n\r\n- [ ] Any text
added follows [EUI's
writing\r\nguidelines](https://elastic.github.io/eui/#/guidelines/writing),
uses\r\nsentence case text and includes
[i18n\r\nsupport](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)\r\n-
[
]\r\n[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)\r\nwas
added for features that require explanation or tutorials\r\n- [ ] [Unit
or
functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere
updated or added to match the most common
scenarios\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"c0aaada30cf06bfcda3bdea8b67638e2f268b206"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/192720","number":192720,"mergeCommit":{"message":"[Fleet]
Add option to have Kafka dynamic topics in outputs (#192720)\n\n##
Summary\r\n\r\nAdd option to have Kafka dynamic topics in outputs
settings. It adds\r\nselection with radio buttons under the `topic`
section.\r\n\r\n### UI changes\r\n\r\n![Screenshot 2024-09-13 at 16
32\r\n54](https://github.com/user-attachments/assets/9414f475-c115-4986-a204-eeb79471b7ea)\r\n![Screenshot
2024-09-13 at 16
33\r\n03](https://github.com/user-attachments/assets/ac36b1da-0e81-4f97-9632-ced8b5776af2)\r\n![Screenshot
2024-09-13 at 16
33\r\n12](https://github.com/user-attachments/assets/5ea3cf66-d929-45fb-aa3b-924734c34305)\r\n![Screenshot
2024-09-13 at 16
33\r\n40](https://github.com/user-attachments/assets/62e477c8-5719-41fe-94b1-4bdda0f00007)\r\n\r\n<details>\r\n
<summary>Testing</summary>\r\n\r\n- Create a kafka output, fill the
required fields, select \"static\" topic\r\nand fill it out with
\"default_topic\". Payload will be:\r\n![Screenshot 2024-09-13 at 16
36\r\n50](https://github.com/user-attachments/assets/ea3761b8-39cb-40ce-bf1c-85e3af1f9d4b)\r\n-
The output should be created correctly\r\n\r\n- Create a kafka output,
fill the required fields, select \"dynamic\"\r\ntopic and choose one
from the dropdown. Payload will be:\r\n![Screenshot 2024-09-13 at 16
36\r\n23](https://github.com/user-attachments/assets/8d0d3ce4-5fb0-4c01-a939-b6a9d50da219)\r\n\r\n-
Create a kafka output, fill the required fields, select
\"dynamic\"\r\ntopic and input one not present in the dropdown (I had
\"custom_topic\".\r\nPayload will be like this:\r\n![Screenshot
2024-09-13 at 16
35\r\n52](https://github.com/user-attachments/assets/27347d01-3efc-4c23-85a8-f3287854de4f)\r\n</details>\r\n\r\n-
Try updating an existing Kafka output from one type to another,
it\r\nshould work correctly\r\n\r\n### Checklist\r\n\r\n- [ ] Any text
added follows [EUI's
writing\r\nguidelines](https://elastic.github.io/eui/#/guidelines/writing),
uses\r\nsentence case text and includes
[i18n\r\nsupport](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)\r\n-
[
]\r\n[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)\r\nwas
added for features that require explanation or tutorials\r\n- [ ] [Unit
or
functional\r\ntests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)\r\nwere
updated or added to match the most common
scenarios\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<elasticmachine@users.noreply.github.com>","sha":"c0aaada30cf06bfcda3bdea8b67638e2f268b206"}},{"branch":"8.x","label":"v8.16.0","branchLabelMappingKey":"^v8.16.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Cristina Amico <criamico@users.noreply.github.com>
This commit is contained in:
Kibana Machine 2024-09-18 03:50:24 +10:00 committed by GitHub
parent fe39f0e5d9
commit 513a12e076
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 461 additions and 79 deletions

View file

@ -68,7 +68,7 @@ pageLoadAssetSize:
files: 22673
filesManagement: 18683
fileUpload: 25664
fleet: 174609
fleet: 190461
globalSearch: 29696
globalSearchBar: 50403
globalSearchProviders: 25554

View file

@ -66,6 +66,11 @@ export const kafkaPartitionType = {
Hash: 'hash',
} as const;
export const kafkaTopicsType = {
Static: 'static',
Dynamic: 'dynamic',
} as const;
export const kafkaTopicWhenType = {
Equals: 'equals',
Contains: 'contains',
@ -133,6 +138,27 @@ export const RESERVED_CONFIG_YML_KEYS = [
'worker',
];
export const kafkaTopicsOptions = [
{
id: kafkaTopicsType.Static,
label: 'Static Topic',
'data-test-subj': 'kafkaTopicStaticRadioButton',
},
{
id: kafkaTopicsType.Dynamic,
label: 'Dynamic Topic',
'data-test-subj': 'kafkaTopicDynamicRadioButton',
},
];
export const KAFKA_DYNAMIC_FIELDS = [
'data_stream.type',
'data_stream.dataset',
'data_stream.namespace',
'@timestamp',
'event.dataset',
];
export const OUTPUT_TYPES_WITH_PRESET_SUPPORT: Array<ValueOf<OutputType>> = [
outputType.Elasticsearch,
outputType.RemoteElasticsearch,

View file

@ -163,7 +163,8 @@ export const SETTINGS_OUTPUTS_KAFKA = {
PARTITIONING_EVENTS_INPUT: 'settingsOutputsFlyout.kafkaPartitionTypeRandomInput',
PARTITIONING_HASH_INPUT: 'settingsOutputsFlyout.kafkaPartitionTypeHashInput',
TOPICS_PANEL: 'settingsOutputsFlyout.kafkaTopicsPanel',
TOPICS_DEFAULT_TOPIC_INPUT: 'settingsOutputsFlyout.kafkaDefaultTopicInput',
TOPICS_DEFAULT_TOPIC_INPUT: 'settingsOutputsFlyout.kafkaStaticTopicInput',
TOPICS_DYNAMIC_TOPIC_INPUT: 'settingsOutputsFlyout.kafkaDynamicTopicInput',
HEADERS_PANEL: 'settingsOutputsFlyout.kafkaHeadersPanel',
HEADERS_KEY_INPUT: 'settingsOutputsFlyout.kafkaHeadersKeyInput0',
HEADERS_VALUE_INPUT: 'settingsOutputsFlyout.kafkaHeadersValueInput0',

View file

@ -5,15 +5,93 @@
* 2.0.
*/
import { EuiFieldText, EuiFormRow, EuiPanel, EuiSpacer, EuiTitle } from '@elastic/eui';
import React, { useMemo } from 'react';
import type { EuiComboBoxOptionOption } from '@elastic/eui';
import {
EuiFieldText,
EuiFormRow,
EuiPanel,
EuiSpacer,
EuiTitle,
EuiRadioGroup,
EuiComboBox,
} from '@elastic/eui';
import { FormattedMessage } from '@kbn/i18n-react';
import React from 'react';
import { i18n } from '@kbn/i18n';
import {
kafkaTopicsType,
KAFKA_DYNAMIC_FIELDS,
kafkaTopicsOptions,
} from '../../../../../../../common/constants';
import type { OutputFormInputsType } from './use_output_form';
export const OutputFormKafkaTopics: React.FunctionComponent<{ inputs: OutputFormInputsType }> = ({
inputs,
}) => {
const dynamicOptions: Array<EuiComboBoxOptionOption<string>> = useMemo(() => {
const options = KAFKA_DYNAMIC_FIELDS.map((option) => ({
label: option,
value: option,
}));
return options;
}, []);
const renderTopics = () => {
switch (inputs.kafkaTopicsInput.value) {
case kafkaTopicsType.Static:
return (
<EuiFormRow
fullWidth
label={
<FormattedMessage
id="xpack.fleet.settings.editOutputFlyout.kafkaTopicsDefaultTopicLabel"
defaultMessage="Default topic"
/>
}
{...inputs.kafkaStaticTopicInput.formRowProps}
>
<EuiFieldText
data-test-subj="settingsOutputsFlyout.kafkaStaticTopicInput"
fullWidth
{...inputs.kafkaStaticTopicInput.props}
/>
</EuiFormRow>
);
case kafkaTopicsType.Dynamic:
return (
<EuiFormRow
fullWidth
helpText={i18n.translate(
'xpack.fleet.settings.editOutputFlyout.kafkaDynamicTopicHelptext',
{
defaultMessage:
'Select a topic from the list. If a topic is not available, create a custom one.',
}
)}
label={
<FormattedMessage
id="xpack.fleet.settings.editOutputFlyout.kafkaDynamicTopicLabel"
defaultMessage="Topic from field"
/>
}
{...inputs.kafkaDynamicTopicInput.formRowProps}
>
<EuiComboBox
data-test-subj="settingsOutputsFlyout.kafkaDynamicTopicInput"
fullWidth
isClearable={true}
options={dynamicOptions}
customOptionText="Use custom field (not recommended)"
singleSelection={{ asPlainText: true }}
{...inputs.kafkaDynamicTopicInput.props}
/>
</EuiFormRow>
);
}
};
return (
<EuiPanel
borderRadius="m"
@ -39,14 +117,16 @@ export const OutputFormKafkaTopics: React.FunctionComponent<{ inputs: OutputForm
defaultMessage="Default topic"
/>
}
{...inputs.kafkaDefaultTopicInput.formRowProps}
>
<EuiFieldText
data-test-subj="settingsOutputsFlyout.kafkaDefaultTopicInput"
fullWidth
{...inputs.kafkaDefaultTopicInput.props}
<EuiRadioGroup
style={{ flexDirection: 'row', flexWrap: 'wrap', columnGap: 30 }}
data-test-subj={'editOutputFlyout.kafkaTopicsRadioInput'}
options={kafkaTopicsOptions}
compressed
{...inputs.kafkaTopicsInput.props}
/>
</EuiFormRow>
{renderTopics()}
<EuiSpacer size="m" />
</EuiPanel>

View file

@ -7,6 +7,7 @@
import { i18n } from '@kbn/i18n';
import { safeLoad } from 'js-yaml';
import type { EuiComboBoxOptionOption } from '@elastic/eui';
const toSecretValidator =
(validator: (value: string) => string[] | undefined) =>
@ -307,7 +308,7 @@ export function validateSSLKey(value: string) {
export const validateSSLKeySecret = toSecretValidator(validateSSLKey);
export function validateKafkaDefaultTopic(value: string) {
export function validateKafkaStaticTopic(value: string) {
if (!value || value === '') {
return [
i18n.translate('xpack.fleet.settings.outputForm.kafkaDefaultTopicRequiredMessage', {
@ -317,6 +318,30 @@ export function validateKafkaDefaultTopic(value: string) {
}
}
export function validateDynamicKafkaTopics(value: Array<EuiComboBoxOptionOption<string>>) {
const res = [];
value.forEach((val, idx) => {
if (!val) {
res.push(
i18n.translate('xpack.fleet.settings.outputForm.kafkaTopicFieldRequiredMessage', {
defaultMessage: 'Topic is required',
})
);
}
});
if (value.length === 0) {
res.push(
i18n.translate('xpack.fleet.settings.outputForm.kafkaTopicRequiredMessage', {
defaultMessage: 'Topic is required',
})
);
}
if (res.length) {
return res;
}
}
export function validateKafkaClientId(value: string) {
const regex = /^[A-Za-z0-9._-]+$/;
return regex.test(value)
@ -343,49 +368,6 @@ export function validateKafkaPartitioningGroupEvents(value: string) {
];
}
export function validateKafkaTopics(
topics: Array<{
topic: string;
when?: {
condition?: string;
type?: string;
};
}>
) {
const errors: Array<{
message: string;
index: number;
condition?: boolean;
}> = [];
topics.forEach((topic, index) => {
if (!topic.topic || topic.topic === '') {
errors.push({
message: i18n.translate('xpack.fleet.settings.outputForm.kafkaTopicRequiredMessage', {
defaultMessage: 'Topic is required',
}),
index,
});
}
if (
!topic.when?.condition ||
topic.when.condition === '' ||
topic.when.condition.split(':').length - 1 !== 1
) {
errors.push({
message: i18n.translate('xpack.fleet.settings.outputForm.kafkaTopicConditionRequired', {
defaultMessage: 'Must be a key, value pair i.e. "http.response.code: 200"',
}),
index,
condition: true,
});
}
});
if (errors.length) {
return errors;
}
}
export function validateKafkaHeaders(pairs: Array<{ key: string; value: string }>) {
const errors: Array<{
message: string;

View file

@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
extractDefaultDynamicKafkaTopics,
extractDefaultStaticKafkaTopic,
} from './use_output_form';
describe('use_output_form', () => {
describe('extractDefaultDynamicKafkaTopics', () => {
it('should return empty array if not topics are passed', () => {
const res = extractDefaultDynamicKafkaTopics({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
});
expect(res).toEqual([]);
});
it('should return empty array if topics have length == 0', () => {
const res = extractDefaultDynamicKafkaTopics({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
topics: [],
});
expect(res).toEqual([]);
});
it('should return empty array if topics do not include %{[', () => {
const res = extractDefaultDynamicKafkaTopics({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
topics: [{ topic: 'something' }],
});
expect(res).toEqual([]);
});
it('should return options for combobox if topics include %{[', () => {
const res = extractDefaultDynamicKafkaTopics({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
topics: [{ topic: '%{[default.dataset]}' }],
});
expect(res).toEqual([
{
label: 'default.dataset',
value: 'default.dataset',
},
]);
});
it('should return options for combobox if topics include %{[ and some special characters', () => {
const res = extractDefaultDynamicKafkaTopics({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
topics: [{ topic: '%{[@timestamp]}' }],
});
expect(res).toEqual([
{
label: '@timestamp',
value: '@timestamp',
},
]);
});
it('should return options for combobox if topics include %{[ and a custom name', () => {
const res = extractDefaultDynamicKafkaTopics({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
topics: [{ topic: '%{[something]}' }],
});
expect(res).toEqual([
{
label: 'something',
value: 'something',
},
]);
});
});
describe('extractDefaultStaticKafkaTopic', () => {
it('should return empty array if not topics are passed', () => {
const res = extractDefaultStaticKafkaTopic({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
});
expect(res).toEqual('');
});
it('should return empty array if topics have length == 0', () => {
const res = extractDefaultStaticKafkaTopic({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
topics: [],
});
expect(res).toEqual('');
});
it('should return empty string if topics include %{[', () => {
const res = extractDefaultStaticKafkaTopic({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
topics: [{ topic: '%{[something]}' }],
});
expect(res).toEqual('');
});
it('should return the topic if topics field is defined', () => {
const res = extractDefaultStaticKafkaTopic({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
topics: [{ topic: 'something' }],
});
expect(res).toEqual('something');
});
it('should return the last topic if topics field is defined and has multiple', () => {
const res = extractDefaultStaticKafkaTopic({
type: 'kafka',
name: 'new',
is_default: false,
is_default_monitoring: false,
topics: [{ topic: 'something_1' }, { topic: 'something_2' }, { topic: 'something_3' }],
});
expect(res).toEqual('something_3');
});
});
});

View file

@ -10,6 +10,8 @@ import { useCallback, useState } from 'react';
import { i18n } from '@kbn/i18n';
import { safeLoad } from 'js-yaml';
import type { EuiComboBoxOptionOption } from '@elastic/eui';
import { getDefaultPresetForEsOutput } from '../../../../../../../common/services/output_helpers';
import type {
@ -27,6 +29,7 @@ import {
kafkaConnectionType,
kafkaPartitionType,
kafkaSaslMechanism,
kafkaTopicsType,
kafkaVerificationModes,
outputType,
} from '../../../../../../../common/constants';
@ -45,6 +48,7 @@ import {
sendPutOutput,
useKeyValueInput,
useAuthz,
useComboBoxWithCustomInput,
} from '../../../../hooks';
import type { Output } from '../../../../types';
import { useConfirmModal } from '../../hooks/use_confirm_modal';
@ -65,10 +69,11 @@ import {
validateKafkaPassword,
validateKafkaPasswordSecret,
validateKafkaHeaders,
validateKafkaDefaultTopic,
validateKafkaStaticTopic,
validateKafkaClientId,
validateKafkaHosts,
validateKafkaPartitioningGroupEvents,
validateDynamicKafkaTopics,
} from './output_form_validators';
import { confirmUpdate } from './confirm_update';
@ -116,7 +121,9 @@ export interface OutputFormInputsType {
kafkaPartitionTypeRoundRobinInput: ReturnType<typeof useInput>;
kafkaHeadersInput: ReturnType<typeof useKeyValueInput>;
kafkaClientIdInput: ReturnType<typeof useInput>;
kafkaDefaultTopicInput: ReturnType<typeof useInput>;
kafkaTopicsInput: ReturnType<typeof useRadioInput>;
kafkaStaticTopicInput: ReturnType<typeof useInput>;
kafkaDynamicTopicInput: ReturnType<typeof useComboBoxWithCustomInput>;
kafkaCompressionInput: ReturnType<typeof useSwitchInput>;
kafkaCompressionLevelInput: ReturnType<typeof useInput>;
kafkaCompressionCodecInput: ReturnType<typeof useInput>;
@ -154,6 +161,36 @@ function extractKafkaOutputSecrets(
return Object.keys(secrets).length ? secrets : null;
}
export function extractDefaultStaticKafkaTopic(o: KafkaOutput): string {
if (
!o?.topics ||
o.topics?.length === 0 ||
(o.topics && o?.topics.length > 0 && o.topics[0].topic?.includes('%{['))
) {
return '';
}
const lastTopic = o.topics[o.topics.length - 1].topic;
return lastTopic || '';
}
export function extractDefaultDynamicKafkaTopics(
o: KafkaOutput
): Array<EuiComboBoxOptionOption<string>> {
if (!o?.topics || o.topics?.length === 0 || (o.topics && !o.topics[0]?.topic?.includes('%{['))) {
return [];
}
const matched = o.topics[0].topic.match(/(%\{\[)(\S*)(\]\})/);
const parsed = matched?.length ? matched[2] : '';
return [
{
label: parsed,
value: parsed,
},
];
}
export function useOutputForm(onSucess: () => void, output?: Output, defaultOuput?: Output) {
const fleetStatus = useFleetStatus();
const authz = useAuthz();
@ -333,15 +370,6 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
const kafkaOutput = output as KafkaOutput;
const extractDefaultKafkaTopic = (topics?: Array<{ topic: string }>): string => {
if (!topics || topics.length === 0) {
return '';
}
const lastTopic = topics[topics.length - 1].topic;
return lastTopic || '';
};
const kafkaVersionInput = useInput(
kafkaOutput?.version ?? '1.0.0',
undefined,
@ -441,11 +469,26 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
isDisabled('partition')
);
const kafkaDefaultTopicInput = useInput(
extractDefaultKafkaTopic(kafkaOutput?.topics),
validateKafkaDefaultTopic,
const kafkaTopicsInput = useRadioInput(
kafkaOutput?.topics && kafkaOutput?.topics[0].topic?.includes('%{[')
? kafkaTopicsType.Dynamic
: kafkaTopicsType.Static,
isDisabled('topics')
);
const kafkaStaticTopicInput = useInput(
extractDefaultStaticKafkaTopic(kafkaOutput),
kafkaTopicsInput.value === kafkaTopicsType.Static ? validateKafkaStaticTopic : undefined,
isDisabled('topics')
);
const kafkaDynamicTopicInput = useComboBoxWithCustomInput(
'kafkaDynamicTopicComboBox',
extractDefaultDynamicKafkaTopics(kafkaOutput),
kafkaTopicsInput.value === kafkaTopicsType.Dynamic ? validateDynamicKafkaTopics : undefined,
isDisabled('topics')
);
const kafkaHeadersInput = useKeyValueInput(
'kafkaHeadersComboBox',
kafkaOutput?.headers ?? [{ key: '', value: '' }],
@ -553,7 +596,9 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
kafkaSslCertificateInput,
kafkaSslKeyInput,
kafkaSslKeySecretInput,
kafkaDefaultTopicInput,
kafkaTopicsInput,
kafkaStaticTopicInput,
kafkaDynamicTopicInput,
};
const hasChanged = Object.values(inputs).some((input) => input.hasChanged);
@ -569,7 +614,6 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
const kafkaSslCertificateValid = kafkaSslCertificateInput.validate();
const kafkaSslKeyPlainValid = kafkaSslKeyInput.validate();
const kafkaSslKeySecretValid = kafkaSslKeySecretInput.validate();
const kafkaDefaultTopicValid = kafkaDefaultTopicInput.validate();
const kafkaHeadersValid = kafkaHeadersInput.validate();
const logstashHostsValid = logstashHostsInput.validate();
const additionalYamlConfigValid = additionalYamlConfigInput.validate();
@ -582,6 +626,8 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
const diskQueuePathValid = diskQueuePathInput.validate();
const partitioningRandomGroupEventsValid = kafkaPartitionTypeRandomInput.validate();
const partitioningRoundRobinGroupEventsValid = kafkaPartitionTypeRoundRobinInput.validate();
const kafkaStaticTopicInputValid = kafkaStaticTopicInput.validate();
const kafkaStaticDynamicTopicInputValid = kafkaDynamicTopicInput.validate();
const kafkaSslKeyValid = kafkaSslKeyInput.value
? kafkaSslKeyPlainValid
@ -611,11 +657,12 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
kafkaUsernameValid &&
kafkaPasswordValid &&
kafkaHeadersValid &&
kafkaDefaultTopicValid &&
additionalYamlConfigValid &&
kafkaClientIDValid &&
partitioningRandomGroupEventsValid &&
partitioningRoundRobinGroupEventsValid
partitioningRoundRobinGroupEventsValid &&
kafkaStaticTopicInputValid &&
kafkaStaticDynamicTopicInputValid
);
}
if (isRemoteElasticsearch) {
@ -647,7 +694,6 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
kafkaSslCertificateInput,
kafkaSslKeyInput,
kafkaSslKeySecretInput,
kafkaDefaultTopicInput,
kafkaHeadersInput,
logstashHostsInput,
additionalYamlConfigInput,
@ -660,6 +706,8 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
diskQueuePathInput,
kafkaPartitionTypeRandomInput,
kafkaPartitionTypeRoundRobinInput,
kafkaStaticTopicInput,
kafkaDynamicTopicInput,
isLogstash,
isKafka,
isRemoteElasticsearch,
@ -824,7 +872,23 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
},
}
: {}),
topics: [{ topic: kafkaDefaultTopicInput.value }],
...(kafkaTopicsInput.value === kafkaTopicsType.Static && kafkaStaticTopicInput.value
? {
topics: [
{
topic: kafkaStaticTopicInput.value,
},
],
}
: kafkaTopicsInput.value === kafkaTopicsType.Dynamic && kafkaDynamicTopicInput.value
? {
topics: [
{
topic: `%{[${kafkaDynamicTopicInput.value}]}`,
},
],
}
: {}),
headers: kafkaHeadersInput.value,
timeout: parseIntegerIfStringDefined(kafkaBrokerTimeoutInput.value),
broker_timeout: parseIntegerIfStringDefined(
@ -969,7 +1033,9 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
kafkaPartitionTypeRandomInput.value,
kafkaPartitionTypeRoundRobinInput.value,
kafkaPartitionTypeHashInput.value,
kafkaDefaultTopicInput.value,
kafkaTopicsInput.value,
kafkaStaticTopicInput.value,
kafkaDynamicTopicInput.value,
kafkaHeadersInput.value,
kafkaBrokerTimeoutInput.value,
kafkaBrokerReachabilityTimeoutInput.value,
@ -980,9 +1046,9 @@ export function useOutputForm(onSucess: () => void, output?: Output, defaultOupu
sslCertificateAuthoritiesInput.value,
sslKeySecretInput.value,
elasticsearchUrlInput.value,
presetInput.value,
serviceTokenInput.value,
serviceTokenSecretInput.value,
presetInput.value,
caTrustedFingerprintInput.value,
confirm,
notifications.toasts,

View file

@ -7,7 +7,7 @@
import { useState, useCallback, useEffect } from 'react';
import type React from 'react';
import type { EuiSwitchEvent } from '@elastic/eui';
import type { EuiComboBoxOptionOption, EuiSwitchEvent } from '@elastic/eui';
export interface FormInput {
validate: () => boolean;
@ -392,3 +392,71 @@ export function useSelectInput(
setValue,
};
}
export function useComboBoxWithCustomInput(
id: string,
defaultValue: Array<EuiComboBoxOptionOption<string>> = [],
validate?: (value: Array<EuiComboBoxOptionOption<string>>) => string[] | undefined,
disabled = false
) {
const [selectedOptions, setSelected] = useState(defaultValue);
const [errors, setErrors] = useState<string[] | undefined>();
const onChange = useCallback(
(selected: Array<EuiComboBoxOptionOption<string>>) => {
setSelected(selected);
if (errors && validate) {
setErrors(validate(selected));
}
},
[validate, errors]
);
const onCreateOption = (searchValue: string) => {
const normalizedSearchValue = searchValue.trim();
const newOption = {
label: normalizedSearchValue,
value: normalizedSearchValue,
};
setSelected([newOption]);
if (validate) {
setErrors(validate([newOption]));
}
};
const validateCallback = useCallback(() => {
if (validate) {
const newErrors = validate(selectedOptions);
setErrors(newErrors);
return newErrors === undefined;
}
return true;
}, [validate, selectedOptions]);
const value = selectedOptions.length > 0 ? selectedOptions[0]?.value : '';
const isInvalid = errors !== undefined;
return {
props: {
id,
onChange,
onCreateOption,
errors,
isInvalid,
disabled,
selectedOptions,
},
formRowProps: {
error: errors,
isInvalid,
},
value,
clear: () => {
setSelected(defaultValue);
},
setSelected,
validate: validateCallback,
};
}

View file

@ -20724,7 +20724,6 @@
"xpack.fleet.settings.outputForm.kafkaPartitioningGroupEventsFormattingMessage": "Le nombre dévénements doit être indiqué en chiffres",
"xpack.fleet.settings.outputForm.kafkaPasswordIsRequired": "Le mot de passe est requis",
"xpack.fleet.settings.outputForm.kafkaPortError": "Numéro de port non valide. Nombre compris entre 1 et 65535 attendu",
"xpack.fleet.settings.outputForm.kafkaTopicConditionRequired": "Il doit s'agir d'une clé, paire de valeurs c.-à-d. \"http.response.code: 200\"",
"xpack.fleet.settings.outputForm.kafkaTopicRequiredMessage": "Un sujet est requis",
"xpack.fleet.settings.outputForm.kafkaUsernameIsRequired": "Le nom d'utilisateur est requis",
"xpack.fleet.settings.outputForm.logstashHostDuplicateError": "Hôte en double",

View file

@ -20714,7 +20714,6 @@
"xpack.fleet.settings.outputForm.kafkaPartitioningGroupEventsFormattingMessage": "イベント数は数値でなければなりません",
"xpack.fleet.settings.outputForm.kafkaPasswordIsRequired": "パスワードが必要です",
"xpack.fleet.settings.outputForm.kafkaPortError": "無効なポート番号です。1から65535までの数字でなければなりません",
"xpack.fleet.settings.outputForm.kafkaTopicConditionRequired": "キーと値のペアでなければなりません。例:\"http.response.code:200\"",
"xpack.fleet.settings.outputForm.kafkaTopicRequiredMessage": "トピックは必須です",
"xpack.fleet.settings.outputForm.kafkaUsernameIsRequired": "ユーザー名が必要です",
"xpack.fleet.settings.outputForm.logstashHostDuplicateError": "重複するホスト",

View file

@ -20742,7 +20742,6 @@
"xpack.fleet.settings.outputForm.kafkaPartitioningGroupEventsFormattingMessage": "事件数必须为数字",
"xpack.fleet.settings.outputForm.kafkaPasswordIsRequired": "“密码”必填",
"xpack.fleet.settings.outputForm.kafkaPortError": "端口号无效。应为介于 1 和 65535 之间的数字",
"xpack.fleet.settings.outputForm.kafkaTopicConditionRequired": "必须为键值对例如“http.response.code:200”",
"xpack.fleet.settings.outputForm.kafkaTopicRequiredMessage": "“主题”必填",
"xpack.fleet.settings.outputForm.kafkaUsernameIsRequired": "“用户名”必填",
"xpack.fleet.settings.outputForm.logstashHostDuplicateError": "主机重复",