[Fleet] Kafka integration API (#159110)

This PR addresses the API aspect of
https://github.com/elastic/kibana/issues/143324

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Konrad Szwarc 2023-07-06 10:41:07 +02:00 committed by GitHub
parent 7e4e2bbf11
commit bbeccd71ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 3012 additions and 446 deletions

View file

@ -1630,6 +1630,111 @@
},
"allow_edit": {
"enabled": false
},
"version": {
"type": "keyword"
},
"key": {
"type": "keyword"
},
"compression": {
"type": "keyword"
},
"compression_level": {
"type": "integer"
},
"client_id": {
"type": "keyword"
},
"auth_type": {
"type": "keyword"
},
"username": {
"type": "keyword"
},
"password": {
"type": "text",
"index": false
},
"sasl": {
"dynamic": false,
"properties": {
"mechanism": {
"type": "text"
}
}
},
"partition": {
"type": "keyword"
},
"random": {
"dynamic": false,
"properties": {
"group_events": {
"type": "integer"
}
}
},
"round_robin": {
"dynamic": false,
"properties": {
"group_events": {
"type": "integer"
}
}
},
"hash": {
"dynamic": false,
"properties": {
"hash": {
"type": "text"
},
"random": {
"type": "boolean"
}
}
},
"topics": {
"dynamic": false,
"properties": {
"topic": {
"type": "keyword"
},
"when": {
"dynamic": false,
"properties": {
"type": {
"type": "text"
},
"condition": {
"type": "text"
}
}
}
}
},
"headers": {
"dynamic": false,
"properties": {
"key": {
"type": "text"
},
"value": {
"type": "text"
}
}
},
"timeout": {
"type": "integer"
},
"broker_timeout": {
"type": "integer"
},
"broker_ack_reliability": {
"type": "text"
},
"broker_buffer_size": {
"type": "integer"
}
}
},

View file

@ -105,7 +105,7 @@ describe('checking migration metadata changes on all registered SO types', () =>
"infrastructure-ui-source": "113182d6895764378dfe7fa9fa027244f3a457c4",
"ingest-agent-policies": "f11cc19275f4c3e4ee7c5cd6423b6706b21b989d",
"ingest-download-sources": "d7edc5e588d9afa61c4b831604582891c54ef1c7",
"ingest-outputs": "3f1e998887d48a706333b67885d1ad8f3217cd90",
"ingest-outputs": "bffa0fd93dfdde904d7f5aff77df72d1c35938d9",
"ingest-package-policies": "7d0e8d288e193e0a8a153bb420c6056bc862c4c3",
"ingest_manager_settings": "418311b03c8eda53f5d2ea6f54c1356afaa65511",
"inventory-view": "b8683c8e352a286b4aca1ab21003115a4800af83",

View file

@ -12,6 +12,50 @@ export const OUTPUT_SAVED_OBJECT_TYPE = 'ingest-outputs';
export const outputType = {
Elasticsearch: 'elasticsearch',
Logstash: 'logstash',
Kafka: 'kafka',
} as const;
export const kafkaCompressionType = {
None: 'none',
Snappy: 'snappy',
Lz4: 'lz4',
Gzip: 'gzip',
} as const;
export const kafkaAuthType = {
Userpass: 'user_pass',
Ssl: 'ssl',
Kerberos: 'kerberos',
} as const;
export const kafkaSaslMechanism = {
Plain: 'PLAIN',
ScramSha256: 'SCRAM-SHA-256',
ScramSha512: 'SCRAM-SHA-512',
} as const;
export const kafkaPartitionType = {
Random: 'random',
RoundRobin: 'round_robin',
Hash: 'hash',
} as const;
export const kafkaTopicWhenType = {
Equals: 'equals',
Contains: 'contains',
Regexp: 'regexp',
Range: 'range',
Network: 'network',
HasFields: 'has_fields',
Or: 'or',
And: 'and',
Not: 'not',
} as const;
export const kafkaAcknowledgeReliabilityLevel = {
Commit: 'Wait for local commit',
Replica: 'Wait for all replicas to commit',
DoNotWait: 'Do not wait',
} as const;
export const DEFAULT_OUTPUT_ID = 'fleet-default-output';

View file

@ -4359,7 +4359,7 @@
"items": {
"type": "array",
"items": {
"$ref": "#/components/schemas/output"
"$ref": "#/components/schemas/output_create_request"
}
},
"total": {
@ -4396,7 +4396,7 @@
"type": "object",
"properties": {
"item": {
"$ref": "#/components/schemas/output"
"$ref": "#/components/schemas/output_create_request"
}
}
}
@ -4408,46 +4408,11 @@
}
},
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"type": {
"type": "string",
"enum": [
"elasticsearch"
]
},
"is_default": {
"type": "boolean"
},
"is_default_monitoring": {
"type": "boolean"
},
"hosts": {
"type": "array",
"items": {
"type": "string"
}
},
"ca_sha256": {
"type": "string"
},
"config_yaml": {
"type": "string"
}
},
"required": [
"name",
"type"
]
"$ref": "#/components/schemas/output_create_request"
}
}
}
@ -4467,15 +4432,7 @@
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"item": {
"$ref": "#/components/schemas/output"
}
},
"required": [
"item"
]
"$ref": "#/components/schemas/output_create_request"
}
}
}
@ -4541,43 +4498,7 @@
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"type": {
"type": "string",
"enum": [
"elasticsearch"
]
},
"is_default": {
"type": "boolean"
},
"is_default_monitoring": {
"type": "boolean"
},
"hosts": {
"type": "array",
"items": {
"type": "string"
}
},
"ca_sha256": {
"type": "string"
},
"ca_trusted_fingerprint": {
"type": "string"
},
"config_yaml": {
"type": "string"
}
},
"required": [
"name",
"type"
]
"$ref": "#/components/schemas/output_update_request"
}
}
}
@ -4588,15 +4509,7 @@
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"item": {
"$ref": "#/components/schemas/output"
}
},
"required": [
"item"
]
"$ref": "#/components/schemas/output_update_request"
}
}
}
@ -7595,8 +7508,293 @@
}
}
},
"output": {
"title": "Output",
"output_create_request_elasticsearch": {
"title": "elasticsearch",
"type": "object",
"properties": {
"id": {
"type": "string"
},
"is_default": {
"type": "boolean"
},
"is_default_monitoring": {
"type": "boolean"
},
"name": {
"type": "string"
},
"type": {
"type": "string",
"enum": [
"elasticsearch"
]
},
"hosts": {
"type": "array",
"items": {
"type": "string"
}
},
"ca_sha256": {
"type": "string"
},
"ca_trusted_fingerprint": {
"type": "string"
},
"config": {
"type": "object"
},
"config_yaml": {
"type": "string"
},
"ssl": {
"type": "object",
"properties": {
"certificate_authorities": {
"type": "array",
"items": {
"type": "string"
}
},
"certificate": {
"type": "string"
},
"key": {
"type": "string"
}
}
},
"proxy_id": {
"type": "string"
},
"shipper": {
"type": "object",
"properties": {
"disk_queue_enabled": {
"type": "boolean"
},
"disk_queue_path": {
"type": "string"
},
"disk_queue_max_size": {
"type": "number"
},
"disk_queue_encryption_enabled": {
"type": "boolean"
},
"disk_queue_compression_enabled": {
"type": "boolean"
},
"compression_level": {
"type": "number"
},
"loadbalance": {
"type": "boolean"
}
}
}
},
"required": [
"name"
]
},
"output_create_request_kafka": {
"title": "kafka",
"type": "object",
"properties": {
"id": {
"type": "string"
},
"is_default": {
"type": "boolean"
},
"is_default_monitoring": {
"type": "boolean"
},
"name": {
"type": "string"
},
"type": {
"type": "string",
"enum": [
"kafka"
]
},
"hosts": {
"type": "array",
"items": {
"type": "string"
}
},
"ca_sha256": {
"type": "string"
},
"ca_trusted_fingerprint": {
"type": "string"
},
"config": {
"type": "object"
},
"config_yaml": {
"type": "string"
},
"ssl": {
"type": "object",
"properties": {
"certificate_authorities": {
"type": "array",
"items": {
"type": "string"
}
},
"certificate": {
"type": "string"
},
"key": {
"type": "string"
}
}
},
"proxy_id": {
"type": "string"
},
"shipper": {
"type": "object",
"properties": {
"disk_queue_enabled": {
"type": "boolean"
},
"disk_queue_path": {
"type": "string"
},
"disk_queue_max_size": {
"type": "number"
},
"disk_queue_encryption_enabled": {
"type": "boolean"
},
"disk_queue_compression_enabled": {
"type": "boolean"
},
"compression_level": {
"type": "number"
},
"loadbalance": {
"type": "boolean"
}
}
},
"version": {
"type": "string"
},
"key": {
"type": "string"
},
"compression": {
"type": "string"
},
"compression_level": {
"type": "number"
},
"client_id": {
"type": "string"
},
"auth_type": {
"type": "string"
},
"username": {
"type": "string"
},
"password": {
"type": "string"
},
"sasl": {
"type": "object",
"properties": {
"mechanism": {
"type": "string"
}
}
},
"partition": {
"type": "string"
},
"random": {
"type": "object",
"properties": {
"group_events": {
"type": "number"
}
}
},
"round_robin": {
"type": "object",
"properties": {
"group_events": {
"type": "number"
}
}
},
"topics": {
"type": "array",
"items": {
"type": "object",
"properties": {
"topic": {
"type": "string"
},
"when": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"condition": {
"type": "string"
}
}
}
}
}
},
"headers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"key": {
"type": "string"
},
"value": {
"type": "string"
}
}
}
},
"timeout": {
"type": "number"
},
"broker_timeout": {
"type": "number"
},
"broker_buffer_size": {
"type": "number"
},
"broker_ack_reliability": {
"type": "string"
}
},
"required": [
"name",
"type",
"topics",
"auth_type",
"hosts"
]
},
"output_create_request_logstash": {
"title": "logstash",
"type": "object",
"properties": {
"id": {
@ -7614,7 +7812,6 @@
"type": {
"type": "string",
"enum": [
"elasticsearch",
"logstash"
]
},
@ -7684,12 +7881,429 @@
}
},
"required": [
"id",
"is_default",
"name",
"hosts",
"type"
]
},
"output_create_request": {
"title": "Output",
"oneOf": [
{
"$ref": "#/components/schemas/output_create_request_elasticsearch"
},
{
"$ref": "#/components/schemas/output_create_request_kafka"
},
{
"$ref": "#/components/schemas/output_create_request_logstash"
}
],
"discriminator": {
"propertyName": "type",
"mapping": {
"elasticsearch": "#/components/schemas/output_create_request_elasticsearch",
"kafka": "#/components/schemas/output_create_request_kafka",
"logstash": "#/components/schemas/output_create_request_logstash"
}
}
},
"output_update_request_elasticsearch": {
"title": "elasticsearch",
"type": "object",
"properties": {
"id": {
"type": "string"
},
"is_default": {
"type": "boolean"
},
"is_default_monitoring": {
"type": "boolean"
},
"name": {
"type": "string"
},
"type": {
"type": "string",
"enum": [
"elasticsearch"
]
},
"hosts": {
"type": "array",
"items": {
"type": "string"
}
},
"ca_sha256": {
"type": "string"
},
"ca_trusted_fingerprint": {
"type": "string"
},
"config": {
"type": "object"
},
"config_yaml": {
"type": "string"
},
"ssl": {
"type": "object",
"properties": {
"certificate_authorities": {
"type": "array",
"items": {
"type": "string"
}
},
"certificate": {
"type": "string"
},
"key": {
"type": "string"
}
}
},
"proxy_id": {
"type": "string"
},
"shipper": {
"type": "object",
"properties": {
"disk_queue_enabled": {
"type": "boolean"
},
"disk_queue_path": {
"type": "string"
},
"disk_queue_max_size": {
"type": "number"
},
"disk_queue_encryption_enabled": {
"type": "boolean"
},
"disk_queue_compression_enabled": {
"type": "boolean"
},
"compression_level": {
"type": "number"
},
"loadbalance": {
"type": "boolean"
}
}
}
},
"required": [
"name",
"hosts",
"type"
]
},
"output_update_request_kafka": {
"title": "kafka",
"type": "object",
"properties": {
"id": {
"type": "string"
},
"is_default": {
"type": "boolean"
},
"is_default_monitoring": {
"type": "boolean"
},
"name": {
"type": "string"
},
"type": {
"type": "string",
"enum": [
"kafka"
]
},
"hosts": {
"type": "array",
"items": {
"type": "string"
}
},
"ca_sha256": {
"type": "string"
},
"ca_trusted_fingerprint": {
"type": "string"
},
"config": {
"type": "object"
},
"config_yaml": {
"type": "string"
},
"ssl": {
"type": "object",
"properties": {
"certificate_authorities": {
"type": "array",
"items": {
"type": "string"
}
},
"certificate": {
"type": "string"
},
"key": {
"type": "string"
}
}
},
"proxy_id": {
"type": "string"
},
"shipper": {
"type": "object",
"properties": {
"disk_queue_enabled": {
"type": "boolean"
},
"disk_queue_path": {
"type": "string"
},
"disk_queue_max_size": {
"type": "number"
},
"disk_queue_encryption_enabled": {
"type": "boolean"
},
"disk_queue_compression_enabled": {
"type": "boolean"
},
"compression_level": {
"type": "number"
},
"loadbalance": {
"type": "boolean"
}
}
},
"version": {
"type": "string"
},
"key": {
"type": "string"
},
"compression": {
"type": "string"
},
"compression_level": {
"type": "number"
},
"client_id": {
"type": "string"
},
"auth_type": {
"type": "string"
},
"username": {
"type": "string"
},
"password": {
"type": "string"
},
"sasl": {
"type": "object",
"properties": {
"mechanism": {
"type": "string"
}
}
},
"partition": {
"type": "string"
},
"random": {
"type": "object",
"properties": {
"group_events": {
"type": "number"
}
}
},
"round_robin": {
"type": "object",
"properties": {
"group_events": {
"type": "number"
}
}
},
"topics": {
"type": "array",
"items": {
"type": "object",
"properties": {
"topic": {
"type": "string"
},
"when": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"condition": {
"type": "string"
}
}
}
}
}
},
"headers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"key": {
"type": "string"
},
"value": {
"type": "string"
}
}
}
},
"timeout": {
"type": "number"
},
"broker_timeout": {
"type": "number"
},
"broker_ack_reliability": {
"type": "string"
},
"broker_buffer_size": {
"type": "number"
}
},
"required": [
"name"
]
},
"output_update_request_logstash": {
"title": "logstash",
"type": "object",
"properties": {
"id": {
"type": "string"
},
"is_default": {
"type": "boolean"
},
"is_default_monitoring": {
"type": "boolean"
},
"name": {
"type": "string"
},
"type": {
"type": "string",
"enum": [
"logstash"
]
},
"hosts": {
"type": "array",
"items": {
"type": "string"
}
},
"ca_sha256": {
"type": "string"
},
"ca_trusted_fingerprint": {
"type": "string"
},
"config": {
"type": "object"
},
"config_yaml": {
"type": "string"
},
"ssl": {
"type": "object",
"properties": {
"certificate_authorities": {
"type": "array",
"items": {
"type": "string"
}
},
"certificate": {
"type": "string"
},
"key": {
"type": "string"
}
}
},
"proxy_id": {
"type": "string"
},
"shipper": {
"type": "object",
"properties": {
"disk_queue_enabled": {
"type": "boolean"
},
"disk_queue_path": {
"type": "string"
},
"disk_queue_max_size": {
"type": "number"
},
"disk_queue_encryption_enabled": {
"type": "boolean"
},
"disk_queue_compression_enabled": {
"type": "boolean"
},
"compression_level": {
"type": "number"
},
"loadbalance": {
"type": "boolean"
}
}
}
},
"required": [
"name"
]
},
"output_update_request": {
"title": "Output",
"oneOf": [
{
"$ref": "#/components/schemas/output_update_request_elasticsearch"
},
{
"$ref": "#/components/schemas/output_update_request_kafka"
},
{
"$ref": "#/components/schemas/output_update_request_logstash"
}
],
"discriminator": {
"propertyName": "type",
"mapping": {
"elasticsearch": "#/components/schemas/output_update_request_elasticsearch",
"kafka": "#/components/schemas/output_update_request_kafka",
"logstash": "#/components/schemas/output_update_request_logstash"
}
}
},
"download_sources": {
"title": "Download Source",
"type": "object",

View file

@ -2721,7 +2721,7 @@ paths:
items:
type: array
items:
$ref: '#/components/schemas/output'
$ref: '#/components/schemas/output_create_request'
total:
type: integer
page:
@ -2744,38 +2744,15 @@ paths:
type: object
properties:
item:
$ref: '#/components/schemas/output'
$ref: '#/components/schemas/output_create_request'
'400':
$ref: '#/components/responses/error'
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
id:
type: string
name:
type: string
type:
type: string
enum:
- elasticsearch
is_default:
type: boolean
is_default_monitoring:
type: boolean
hosts:
type: array
items:
type: string
ca_sha256:
type: string
config_yaml:
type: string
required:
- name
- type
$ref: '#/components/schemas/output_create_request'
operationId: post-outputs
/outputs/{outputId}:
get:
@ -2788,12 +2765,7 @@ paths:
content:
application/json:
schema:
type: object
properties:
item:
$ref: '#/components/schemas/output'
required:
- item
$ref: '#/components/schemas/output_create_request'
'400':
$ref: '#/components/responses/error'
operationId: get-output
@ -2833,43 +2805,14 @@ paths:
content:
application/json:
schema:
type: object
properties:
name:
type: string
type:
type: string
enum:
- elasticsearch
is_default:
type: boolean
is_default_monitoring:
type: boolean
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config_yaml:
type: string
required:
- name
- type
$ref: '#/components/schemas/output_update_request'
responses:
'200':
description: OK
content:
application/json:
schema:
type: object
properties:
item:
$ref: '#/components/schemas/output'
required:
- item
$ref: '#/components/schemas/output_update_request'
'400':
$ref: '#/components/responses/error'
parameters:
@ -4896,8 +4839,8 @@ components:
type: array
items:
$ref: '#/components/schemas/full_agent_policy_input'
output:
title: Output
output_create_request_elasticsearch:
title: elasticsearch
type: object
properties:
id:
@ -4912,6 +4855,193 @@ components:
type: string
enum:
- elasticsearch
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
required:
- name
output_create_request_kafka:
title: kafka
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum:
- kafka
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
version:
type: string
key:
type: string
compression:
type: string
compression_level:
type: number
client_id:
type: string
auth_type:
type: string
username:
type: string
password:
type: string
sasl:
type: object
properties:
mechanism:
type: string
partition:
type: string
random:
type: object
properties:
group_events:
type: number
round_robin:
type: object
properties:
group_events:
type: number
topics:
type: array
items:
type: object
properties:
topic:
type: string
when:
type: object
properties:
type:
type: string
condition:
type: string
headers:
type: array
items:
type: object
properties:
key:
type: string
value:
type: string
timeout:
type: number
broker_timeout:
type: number
broker_buffer_size:
type: number
broker_ack_reliability:
type: string
required:
- name
- type
- topics
- auth_type
- hosts
output_create_request_logstash:
title: logstash
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum:
- logstash
hosts:
type: array
@ -4956,10 +5086,279 @@ components:
loadbalance:
type: boolean
required:
- id
- is_default
- name
- hosts
- type
output_create_request:
title: Output
oneOf:
- $ref: '#/components/schemas/output_create_request_elasticsearch'
- $ref: '#/components/schemas/output_create_request_kafka'
- $ref: '#/components/schemas/output_create_request_logstash'
discriminator:
propertyName: type
mapping:
elasticsearch: '#/components/schemas/output_create_request_elasticsearch'
kafka: '#/components/schemas/output_create_request_kafka'
logstash: '#/components/schemas/output_create_request_logstash'
output_update_request_elasticsearch:
title: elasticsearch
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum:
- elasticsearch
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
required:
- name
- hosts
- type
output_update_request_kafka:
title: kafka
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum:
- kafka
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
version:
type: string
key:
type: string
compression:
type: string
compression_level:
type: number
client_id:
type: string
auth_type:
type: string
username:
type: string
password:
type: string
sasl:
type: object
properties:
mechanism:
type: string
partition:
type: string
random:
type: object
properties:
group_events:
type: number
round_robin:
type: object
properties:
group_events:
type: number
topics:
type: array
items:
type: object
properties:
topic:
type: string
when:
type: object
properties:
type:
type: string
condition:
type: string
headers:
type: array
items:
type: object
properties:
key:
type: string
value:
type: string
timeout:
type: number
broker_timeout:
type: number
broker_ack_reliability:
type: string
broker_buffer_size:
type: number
required:
- name
output_update_request_logstash:
title: logstash
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum:
- logstash
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
required:
- name
output_update_request:
title: Output
oneOf:
- $ref: '#/components/schemas/output_update_request_elasticsearch'
- $ref: '#/components/schemas/output_update_request_kafka'
- $ref: '#/components/schemas/output_update_request_logstash'
discriminator:
propertyName: type
mapping:
elasticsearch: '#/components/schemas/output_update_request_elasticsearch'
kafka: '#/components/schemas/output_update_request_kafka'
logstash: '#/components/schemas/output_update_request_logstash'
download_sources:
title: Download Source
type: object

View file

@ -0,0 +1,11 @@
title: Output
oneOf:
- $ref: './output_create_request_elasticsearch.yaml'
- $ref: './output_create_request_kafka.yaml'
- $ref: './output_create_request_logstash.yaml'
discriminator:
propertyName: type
mapping:
elasticsearch: './output_create_request_elasticsearch.yaml'
kafka: './output_create_request_kafka.yaml'
logstash: './output_create_request_logstash.yaml'

View file

@ -0,0 +1,58 @@
title: elasticsearch
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum: ['elasticsearch']
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
required:
- name

View file

@ -0,0 +1,126 @@
title: kafka
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum: ['kafka']
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
version:
type: string
key:
type: string
compression:
type: string
compression_level:
type: number
client_id:
type: string
auth_type:
type: string
username:
type: string
password:
type: string
sasl:
type: object
properties:
mechanism:
type: string
partition:
type: string
random:
type: object
properties:
group_events:
type: number
round_robin:
type: object
properties:
group_events:
type: number
topics:
type: array
items:
type: object
properties:
topic:
type: string
when:
type: object
properties:
type:
type: string
condition:
type: string
headers:
type: array
items:
type: object
properties:
key:
type: string
value:
type: string
timeout:
type: number
broker_timeout:
type: number
broker_buffer_size:
type: number
broker_ack_reliability:
type: string
required:
- name
- type
- topics
- auth_type
- hosts

View file

@ -1,4 +1,4 @@
title: Output
title: logstash
type: object
properties:
id:
@ -11,7 +11,7 @@ properties:
type: string
type:
type: string
enum: ['elasticsearch', 'logstash']
enum: ['logstash']
hosts:
type: array
items:
@ -55,7 +55,6 @@ properties:
loadbalance:
type: boolean
required:
- id
- is_default
- name
- hosts
- type

View file

@ -0,0 +1,11 @@
title: Output
oneOf:
- $ref: './output_update_request_elasticsearch.yaml'
- $ref: './output_update_request_kafka.yaml'
- $ref: './output_update_request_logstash.yaml'
discriminator:
propertyName: type
mapping:
elasticsearch: './output_update_request_elasticsearch.yaml'
kafka: './output_update_request_kafka.yaml'
logstash: './output_update_request_logstash.yaml'

View file

@ -0,0 +1,60 @@
title: elasticsearch
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum: ['elasticsearch']
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
required:
- name
- hosts
- type

View file

@ -0,0 +1,122 @@
title: kafka
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum: ['kafka']
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
version:
type: string
key:
type: string
compression:
type: string
compression_level:
type: number
client_id:
type: string
auth_type:
type: string
username:
type: string
password:
type: string
sasl:
type: object
properties:
mechanism:
type: string
partition:
type: string
random:
type: object
properties:
group_events:
type: number
round_robin:
type: object
properties:
group_events:
type: number
topics:
type: array
items:
type: object
properties:
topic:
type: string
when:
type: object
properties:
type:
type: string
condition:
type: string
headers:
type: array
items:
type: object
properties:
key:
type: string
value:
type: string
timeout:
type: number
broker_timeout:
type: number
broker_ack_reliability:
type: string
broker_buffer_size:
type: number
required:
- name

View file

@ -0,0 +1,58 @@
title: logstash
type: object
properties:
id:
type: string
is_default:
type: boolean
is_default_monitoring:
type: boolean
name:
type: string
type:
type: string
enum: ['logstash']
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config:
type: object
config_yaml:
type: string
ssl:
type: object
properties:
certificate_authorities:
type: array
items:
type: string
certificate:
type: string
key:
type: string
proxy_id:
type: string
shipper:
type: object
properties:
disk_queue_enabled:
type: boolean
disk_queue_path:
type: string
disk_queue_max_size:
type: number
disk_queue_encryption_enabled:
type: boolean
disk_queue_compression_enabled:
type: boolean
compression_level:
type: number
loadbalance:
type: boolean
required:
- name

View file

@ -13,7 +13,7 @@ get:
items:
type: array
items:
$ref: ../components/schemas/output.yaml
$ref: ../components/schemas/output_create_request.yaml
total:
type: integer
page:
@ -36,35 +36,13 @@ post:
type: object
properties:
item:
$ref: ../components/schemas/output.yaml
$ref: ../components/schemas/output_create_request.yaml
'400':
$ref: ../components/responses/error.yaml
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
id:
type: string
name:
type: string
type:
type: string
enum: ['elasticsearch']
is_default:
type: boolean
is_default_monitoring:
type: boolean
hosts:
type: array
items:
type: string
ca_sha256:
type: string
config_yaml:
type: string
required:
- name
- type
$ref: ../components/schemas/output_create_request.yaml
operationId: post-outputs

View file

@ -8,12 +8,7 @@ get:
content:
application/json:
schema:
type: object
properties:
item:
$ref: ../components/schemas/output.yaml
required:
- item
$ref: ../components/schemas/output_create_request.yaml
'400':
$ref: ../components/responses/error.yaml
operationId: get-output
@ -53,42 +48,14 @@ put:
content:
application/json:
schema:
type: object
properties:
name:
type: string
type:
type: string
enum: ['elasticsearch']
is_default:
type: boolean
is_default_monitoring:
type: boolean
hosts:
type: array
items:
type: string
ca_sha256:
type: string
ca_trusted_fingerprint:
type: string
config_yaml:
type: string
required:
- name
- type
$ref: ../components/schemas/output_update_request.yaml
responses:
'200':
description: OK
content:
application/json:
schema:
type: object
properties:
item:
$ref: ../components/schemas/output.yaml
required:
- item
$ref: ../components/schemas/output_update_request.yaml
'400':
$ref: ../components/responses/error.yaml
parameters:

View file

@ -7,10 +7,20 @@
import type { outputType } from '../../constants';
import type { ValueOf } from '..';
import type { kafkaAuthType, kafkaCompressionType, kafkaSaslMechanism } from '../../constants';
import type { kafkaPartitionType } from '../../constants';
import type { kafkaTopicWhenType } from '../../constants';
import type { kafkaAcknowledgeReliabilityLevel } from '../../constants';
export type OutputType = typeof outputType;
export type KafkaCompressionType = typeof kafkaCompressionType;
export type KafkaAuthType = typeof kafkaAuthType;
export type KafkaSaslMechanism = typeof kafkaSaslMechanism;
export type KafkaPartitionType = typeof kafkaPartitionType;
export type KafkaTopicWhenType = typeof kafkaTopicWhenType;
export type KafkaAcknowledgeReliabilityLevel = typeof kafkaAcknowledgeReliabilityLevel;
export interface NewOutput {
interface NewBaseOutput {
is_default: boolean;
is_default_monitoring: boolean;
is_preconfigured?: boolean;
@ -30,6 +40,16 @@ export interface NewOutput {
allow_edit?: string[];
}
export interface NewElasticsearchOutput extends NewBaseOutput {
type: OutputType['Elasticsearch'];
}
export interface NewLogstashOutput extends NewBaseOutput {
type: OutputType['Logstash'];
}
export type NewOutput = NewElasticsearchOutput | NewLogstashOutput | KafkaOutput;
export type Output = NewOutput & {
id: string;
};
@ -46,3 +66,45 @@ export interface ShipperOutput {
queue_flush_timeout?: number | null;
max_batch_bytes?: number | null;
}
export interface KafkaOutput extends NewBaseOutput {
type: OutputType['Kafka'];
hosts?: string[];
client_id?: string;
version?: string;
key?: string;
compression?: ValueOf<KafkaCompressionType>;
compression_level?: number;
auth_type?: ValueOf<KafkaAuthType>;
username?: string;
password?: string;
sasl?: {
mechanism?: ValueOf<KafkaSaslMechanism>;
};
partition?: ValueOf<KafkaPartitionType>;
random?: {
group_events?: number;
};
round_robin?: {
group_events?: number;
};
hash?: {
hash?: string;
random?: boolean;
};
topics?: Array<{
topic: string;
when?: {
type?: ValueOf<KafkaTopicWhenType>;
condition?: string;
};
}>;
headers?: Array<{
key: string;
value: string;
}>;
timeout?: number;
broker_timeout?: number;
broker_ack_reliability?: ValueOf<KafkaAcknowledgeReliabilityLevel>;
broker_buffer_size?: number;
}

View file

@ -11,6 +11,8 @@ import { EuiBasicTable, EuiButtonIcon, EuiFlexGroup, EuiFlexItem, EuiIconTip } f
import type { EuiBasicTableColumn } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { outputType } from '../../../../../../../common/constants';
import { useLink } from '../../../../hooks';
import type { Output } from '../../../../types';
@ -136,6 +138,7 @@ export const OutputsTable: React.FunctionComponent<OutputsTableProps> = ({
defaultMessage: 'Edit',
})}
data-test-subj="editOutputBtn"
isDisabled={output.type === outputType.Kafka} // Kafka output is not supported yet but can be created via api
/>
</EuiFlexItem>
</EuiFlexGroup>

View file

@ -66,7 +66,7 @@ export const getOneOuputHandler: RequestHandler<
}
};
export const putOuputHandler: RequestHandler<
export const putOutputHandler: RequestHandler<
TypeOf<typeof PutOutputRequestSchema.params>,
undefined,
TypeOf<typeof PutOutputRequestSchema.body>
@ -99,7 +99,7 @@ export const putOuputHandler: RequestHandler<
}
};
export const postOuputHandler: RequestHandler<
export const postOutputHandler: RequestHandler<
undefined,
undefined,
TypeOf<typeof PostOutputRequestSchema.body>

View file

@ -20,8 +20,8 @@ import {
deleteOutputHandler,
getOneOuputHandler,
getOutputsHandler,
postOuputHandler,
putOuputHandler,
postOutputHandler,
putOutputHandler,
postLogstashApiKeyHandler,
} from './handler';
@ -54,7 +54,7 @@ export const registerRoutes = (router: FleetAuthzRouter) => {
fleet: { all: true },
},
},
putOuputHandler
putOutputHandler
);
router.post(
@ -65,7 +65,7 @@ export const registerRoutes = (router: FleetAuthzRouter) => {
fleet: { all: true },
},
},
postOuputHandler
postOutputHandler
);
router.delete(

View file

@ -164,6 +164,64 @@ const getSavedObjectTypes = (): { [key: string]: SavedObjectsType } => ({
properties: {},
},
allow_edit: { enabled: false },
version: { type: 'keyword' },
key: { type: 'keyword' },
compression: { type: 'keyword' },
compression_level: { type: 'integer' },
client_id: { type: 'keyword' },
auth_type: { type: 'keyword' },
username: { type: 'keyword' },
password: { type: 'text', index: false },
sasl: {
dynamic: false,
properties: {
mechanism: { type: 'text' },
},
},
partition: { type: 'keyword' },
random: {
dynamic: false,
properties: {
group_events: { type: 'integer' },
},
},
round_robin: {
dynamic: false,
properties: {
group_events: { type: 'integer' },
},
},
hash: {
dynamic: false,
properties: {
hash: { type: 'text' },
random: { type: 'boolean' },
},
},
topics: {
dynamic: false,
properties: {
topic: { type: 'keyword' },
when: {
dynamic: false,
properties: {
type: { type: 'text' },
condition: { type: 'text' },
},
},
},
},
headers: {
dynamic: false,
properties: {
key: { type: 'text' },
value: { type: 'text' },
},
},
timeout: { type: 'integer' },
broker_timeout: { type: 'integer' },
broker_ack_reliability: { type: 'text' },
broker_buffer_size: { type: 'integer' },
},
},
migrations: {

View file

@ -272,6 +272,55 @@ export function transformOutputToFullPolicyOutput(
const isShipperDisabled = !configJs?.shipper || configJs?.shipper?.enabled === false;
let shipperDiskQueueData = {};
let generalShipperData;
let kafkaData = {};
if (type === outputType.Kafka) {
/* eslint-disable @typescript-eslint/naming-convention */
const {
client_id,
version,
key,
compression,
compression_level,
auth_type,
username,
password,
sasl,
partition,
random,
round_robin,
hash,
topics,
headers,
timeout,
broker_timeout,
broker_buffer_size,
broker_ack_reliability,
} = output;
/* eslint-enable @typescript-eslint/naming-convention */
kafkaData = {
client_id,
version,
key,
compression,
compression_level,
auth_type,
username,
password,
sasl,
partition,
random,
round_robin,
hash,
topics,
headers,
timeout,
broker_timeout,
broker_buffer_size,
broker_ack_reliability,
};
}
if (shipper) {
if (!isShipperDisabled) {
@ -301,6 +350,7 @@ export function transformOutputToFullPolicyOutput(
...shipperDiskQueueData,
type,
hosts,
...kafkaData,
...(!isShipperDisabled ? generalShipperData : {}),
...(ca_sha256 ? { ca_sha256 } : {}),
...(ssl ? { ssl } : {}),

View file

@ -36,8 +36,11 @@ export async function getDataOutputForAgentPolicy(
/**
* Validate outputs are valid for a policy using the current kibana licence or throw.
* @param data
* @returns
* @param soClient
* @param newData
* @param existingData
* @param allowedOutputTypeForPolicy
*/
export async function validateOutputForPolicy(
soClient: SavedObjectsClientContract,

View file

@ -109,6 +109,13 @@ function getMockedSoClient(
});
}
case outputIdToUuid('existing-kafka-output'): {
return mockOutputSO('existing-kafka-output', {
type: 'kafka',
is_default: false,
});
}
case outputIdToUuid('existing-es-output'): {
return mockOutputSO('existing-es-output', {
type: 'elasticsearch',
@ -194,6 +201,37 @@ function getMockedSoClient(
describe('Output Service', () => {
const esClientMock = elasticsearchServiceMock.createElasticsearchClient();
const mockedAgentPolicyResolvedValue = {
items: [
{
name: 'fleet server policy',
id: 'fleet_server_policy',
is_default_fleet_server: true,
package_policies: [
{
name: 'fleet-server-123',
package: {
name: 'fleet_server',
},
},
],
},
{
name: 'agent policy 1',
id: 'agent_policy_1',
is_managed: false,
package_policies: [
{
name: 'nginx',
package: {
name: 'nginx',
},
},
],
},
],
} as unknown as ReturnType<typeof mockedAgentPolicyService.list>;
beforeEach(() => {
mockedAgentPolicyService.list.mockClear();
mockedAgentPolicyService.hasAPMIntegration.mockClear();
@ -408,36 +446,7 @@ describe('Output Service', () => {
mockedAppContextService.getEncryptedSavedObjectsSetup.mockReturnValue({
canEncrypt: true,
} as any);
mockedAgentPolicyService.list.mockResolvedValue({
items: [
{
name: 'fleet server policy',
id: 'fleet_server_policy',
is_default_fleet_server: true,
package_policies: [
{
name: 'fleet-server-123',
package: {
name: 'fleet_server',
},
},
],
},
{
name: 'agent policy 1',
id: 'agent_policy_1',
is_managed: false,
package_policies: [
{
name: 'nginx',
package: {
name: 'nginx',
},
},
],
},
],
} as unknown as ReturnType<typeof mockedAgentPolicyService.list>);
mockedAgentPolicyService.list.mockResolvedValue(mockedAgentPolicyResolvedValue);
mockedAgentPolicyService.hasFleetServerIntegration.mockReturnValue(true);
await outputService.create(
@ -468,36 +477,7 @@ describe('Output Service', () => {
mockedAppContextService.getEncryptedSavedObjectsSetup.mockReturnValue({
canEncrypt: true,
} as any);
mockedAgentPolicyService.list.mockResolvedValue({
items: [
{
name: 'fleet server policy',
id: 'fleet_server_policy',
is_default_fleet_server: true,
package_policies: [
{
name: 'fleet-server-123',
package: {
name: 'fleet_server',
},
},
],
},
{
name: 'agent policy 1',
id: 'agent_policy_1',
is_managed: false,
package_policies: [
{
name: 'nginx',
package: {
name: 'nginx',
},
},
],
},
],
} as unknown as ReturnType<typeof mockedAgentPolicyService.list>);
mockedAgentPolicyService.list.mockResolvedValue(mockedAgentPolicyResolvedValue);
mockedAgentPolicyService.hasFleetServerIntegration.mockReturnValue(true);
await outputService.create(
@ -534,6 +514,61 @@ describe('Output Service', () => {
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
});
// With kafka output
it('Should update fleet server policies with data_output_id=default_output_id if a new default kafka output is created', async () => {
const soClient = getMockedSoClient({
defaultOutputId: 'output-test',
});
mockedAppContextService.getEncryptedSavedObjectsSetup.mockReturnValue({
canEncrypt: true,
} as any);
mockedAgentPolicyService.list.mockResolvedValue(mockedAgentPolicyResolvedValue);
mockedAgentPolicyService.hasFleetServerIntegration.mockReturnValue(true);
await outputService.create(
soClient,
esClientMock,
{
is_default: true,
is_default_monitoring: false,
name: 'Test',
type: 'kafka',
},
{ id: 'output-1' }
);
expect(mockedAgentPolicyService.update).toBeCalledWith(
expect.anything(),
expect.anything(),
'fleet_server_policy',
{ data_output_id: 'output-test' },
{ force: false }
);
});
it('Should allow to create a new kafka output with no errors if is not set as default', async () => {
const soClient = getMockedSoClient({
defaultOutputId: 'output-test',
});
mockedAppContextService.getEncryptedSavedObjectsSetup.mockReturnValue({
canEncrypt: true,
} as any);
mockedAgentPolicyService.list.mockResolvedValue(mockedAgentPolicyResolvedValue);
mockedAgentPolicyService.hasFleetServerIntegration.mockReturnValue(true);
await outputService.create(
soClient,
esClientMock,
{
is_default: false,
is_default_monitoring: false,
name: 'Test',
type: 'kafka',
},
{ id: 'output-1' }
);
});
});
describe('update', () => {
@ -744,6 +779,44 @@ describe('Output Service', () => {
});
});
it('Should delete Kafka specific fields if the output type change to ES', async () => {
const soClient = getMockedSoClient({});
mockedAgentPolicyService.list.mockResolvedValue({
items: [{}],
} as unknown as ReturnType<typeof mockedAgentPolicyService.list>);
mockedAgentPolicyService.hasAPMIntegration.mockReturnValue(false);
await outputService.update(soClient, esClientMock, 'existing-kafka-output', {
type: 'elasticsearch',
hosts: ['http://test:4343'],
});
expect(soClient.update).toBeCalledWith(expect.anything(), expect.anything(), {
type: 'elasticsearch',
hosts: ['http://test:4343'],
auth_type: null,
broker_timeout: null,
broker_ack_reliability: null,
broker_buffer_size: null,
client_id: null,
compression: null,
compression_level: null,
hash: null,
key: null,
partition: null,
password: null,
random: null,
round_robin: null,
sasl: null,
ssl: null,
timeout: null,
topics: null,
headers: null,
username: null,
version: null,
});
});
// With logstash output
it('Should work if you try to make that output the default output and no policies using default output has APM integration', async () => {
const soClient = getMockedSoClient({});
@ -784,7 +857,7 @@ describe('Output Service', () => {
});
});
it('Should throw if you try to make that output the default output and somne policies using default output has APM integration', async () => {
it('Should throw if you try to make that output the default output and some policies using default output has APM integration', async () => {
const soClient = getMockedSoClient({});
mockedAgentPolicyService.list.mockResolvedValue({
items: [{}],
@ -819,6 +892,46 @@ describe('Output Service', () => {
});
});
it('Should delete Kafka specific fields if the output type changes to logstash', async () => {
const soClient = getMockedSoClient({});
mockedAgentPolicyService.list.mockResolvedValue({
items: [{}],
} as unknown as ReturnType<typeof mockedAgentPolicyService.list>);
mockedAgentPolicyService.hasAPMIntegration.mockReturnValue(false);
mockedAgentPolicyService.hasFleetServerIntegration.mockReturnValue(false);
await outputService.update(soClient, esClientMock, 'existing-kafka-output', {
type: 'logstash',
hosts: ['test:4343'],
});
expect(soClient.update).toBeCalledWith(expect.anything(), expect.anything(), {
type: 'logstash',
hosts: ['test:4343'],
ca_sha256: null,
ca_trusted_fingerprint: null,
auth_type: null,
broker_timeout: null,
broker_ack_reliability: null,
broker_buffer_size: null,
client_id: null,
compression: null,
compression_level: null,
hash: null,
key: null,
partition: null,
password: null,
random: null,
round_robin: null,
sasl: null,
timeout: null,
topics: null,
headers: null,
username: null,
version: null,
});
});
it('Should update fleet server policies with data_output_id=default_output_id if a default ES output is changed to logstash', async () => {
const soClient = getMockedSoClient({
defaultOutputId: 'output-test',
@ -1000,6 +1113,152 @@ describe('Output Service', () => {
savedObjectType: OUTPUT_SAVED_OBJECT_TYPE,
});
});
// With Kafka output
it('Should delete ES specific fields if the output type changes to kafka', async () => {
const soClient = getMockedSoClient({});
mockedAgentPolicyService.list.mockResolvedValue({
items: [{}],
} as unknown as ReturnType<typeof mockedAgentPolicyService.list>);
mockedAgentPolicyService.hasAPMIntegration.mockReturnValue(false);
mockedAgentPolicyService.hasFleetServerIntegration.mockReturnValue(false);
await outputService.update(soClient, esClientMock, 'existing-es-output', {
type: 'kafka',
hosts: ['test:4343'],
});
expect(soClient.update).toBeCalledWith(expect.anything(), expect.anything(), {
type: 'kafka',
hosts: ['test:4343'],
ca_sha256: null,
ca_trusted_fingerprint: null,
broker_timeout: 10,
broker_ack_reliability: 'Wait for local commit',
broker_buffer_size: 256,
client_id: 'Elastic Agent',
compression: 'gzip',
compression_level: 4,
partition: 'hash',
timeout: 30,
version: '1.0.0',
});
});
it('Should delete Logstash specific fields if the output type changes to kafka', async () => {
const soClient = getMockedSoClient({});
mockedAgentPolicyService.list.mockResolvedValue({
items: [{}],
} as unknown as ReturnType<typeof mockedAgentPolicyService.list>);
mockedAgentPolicyService.hasAPMIntegration.mockReturnValue(false);
mockedAgentPolicyService.hasFleetServerIntegration.mockReturnValue(false);
await outputService.update(soClient, esClientMock, 'existing-logstash-output', {
type: 'kafka',
hosts: ['test:4343'],
});
expect(soClient.update).toBeCalledWith(expect.anything(), expect.anything(), {
hosts: ['test:4343'],
broker_timeout: 10,
broker_ack_reliability: 'Wait for local commit',
broker_buffer_size: 256,
ca_sha256: null,
ca_trusted_fingerprint: null,
client_id: 'Elastic Agent',
compression: 'gzip',
compression_level: 4,
partition: 'hash',
timeout: 30,
type: 'kafka',
version: '1.0.0',
});
});
it('Should update fleet server policies with data_output_id=default_output_id if a default ES output is changed to kafka', async () => {
const soClient = getMockedSoClient({
defaultOutputId: 'output-test',
});
mockedAgentPolicyService.list.mockResolvedValue(mockedAgentPolicyResolvedValue);
mockedAgentPolicyService.hasFleetServerIntegration.mockReturnValue(true);
await outputService.update(soClient, esClientMock, 'output-test', {
type: 'kafka',
hosts: ['http://test:4343'],
is_default: true,
});
expect(soClient.update).toBeCalledWith(expect.anything(), expect.anything(), {
type: 'kafka',
hosts: ['http://test:4343'],
is_default: true,
ca_sha256: null,
ca_trusted_fingerprint: null,
client_id: 'Elastic Agent',
compression: 'gzip',
compression_level: 4,
partition: 'hash',
timeout: 30,
version: '1.0.0',
broker_timeout: 10,
broker_ack_reliability: 'Wait for local commit',
broker_buffer_size: 256,
});
expect(mockedAgentPolicyService.update).toBeCalledWith(
expect.anything(),
expect.anything(),
'fleet_server_policy',
{ data_output_id: 'output-test' },
{ force: false }
);
});
it('Should update fleet server policies with data_output_id=default_output_id and force=true if a default ES output is changed to kafka, from preconfiguration', async () => {
const soClient = getMockedSoClient({
defaultOutputId: 'output-test',
});
mockedAgentPolicyService.list.mockResolvedValue(mockedAgentPolicyResolvedValue);
mockedAgentPolicyService.hasFleetServerIntegration.mockReturnValue(true);
await outputService.update(
soClient,
esClientMock,
'output-test',
{
type: 'kafka',
hosts: ['http://test:4343'],
is_default: true,
},
{
fromPreconfiguration: true,
}
);
expect(soClient.update).toBeCalledWith(expect.anything(), expect.anything(), {
type: 'kafka',
hosts: ['http://test:4343'],
is_default: true,
ca_sha256: null,
ca_trusted_fingerprint: null,
client_id: 'Elastic Agent',
compression: 'gzip',
compression_level: 4,
partition: 'hash',
timeout: 30,
version: '1.0.0',
broker_timeout: 10,
broker_ack_reliability: 'Wait for local commit',
broker_buffer_size: 256,
});
expect(mockedAgentPolicyService.update).toBeCalledWith(
expect.anything(),
expect.anything(),
'fleet_server_policy',
{ data_output_id: 'output-test' },
{ force: true }
);
});
});
describe('delete', () => {

View file

@ -17,14 +17,29 @@ import type {
} from '@kbn/core/server';
import { SavedObjectsUtils } from '@kbn/core/server';
import type { AgentPolicy, NewOutput, Output, OutputSOAttributes } from '../types';
import _ from 'lodash';
import type {
NewOutput,
Output,
OutputSOAttributes,
AgentPolicy,
OutputSoKafkaAttributes,
} from '../types';
import {
AGENT_POLICY_SAVED_OBJECT_TYPE,
DEFAULT_OUTPUT,
DEFAULT_OUTPUT_ID,
OUTPUT_SAVED_OBJECT_TYPE,
} from '../constants';
import { outputType, SO_SEARCH_LIMIT } from '../../common/constants';
import {
SO_SEARCH_LIMIT,
outputType,
kafkaSaslMechanism,
kafkaPartitionType,
kafkaCompressionType,
kafkaAcknowledgeReliabilityLevel,
} from '../../common/constants';
import { normalizeHostsForAgents } from '../../common/services';
import {
FleetEncryptedSavedObjectEncryptionKeyRequired,
@ -32,6 +47,8 @@ import {
OutputUnauthorizedError,
} from '../errors';
import type { OutputType } from '../types';
import { agentPolicyService } from './agent_policy';
import { appContextService } from './app_context';
import { escapeSearchQueryPhrase } from './saved_object';
@ -151,11 +168,16 @@ async function findPoliciesWithFleetServer(
return [];
}
function validateLogstashOutputNotUsedInFleetServerPolicy(agentPolicies: AgentPolicy[]) {
function validateOutputNotUsedInFleetServerPolicy(
agentPolicies: AgentPolicy[],
dataOutputType: OutputType['Logstash'] | OutputType['Kafka']
) {
// Validate no policy with fleet server use that policy
for (const agentPolicy of agentPolicies) {
throw new OutputInvalidError(
`Logstash output cannot be used with Fleet Server integration in ${agentPolicy.name}. Please create a new ElasticSearch output.`
`${_.capitalize(dataOutputType)} output cannot be used with Fleet Server integration in ${
agentPolicy.name
}. Please create a new ElasticSearch output.`
);
}
}
@ -175,10 +197,13 @@ async function validateTypeChanges(
if (data.type === outputType.Logstash || originalOutput.type === outputType.Logstash) {
await validateLogstashOutputNotUsedInAPMPolicy(soClient, id, mergedIsDefault);
}
// prevent changing an ES output to logstash if it's used by fleet server policies
if (originalOutput.type === outputType.Elasticsearch && data?.type === outputType.Logstash) {
// prevent changing an ES output to logstash or kafka if it's used by fleet server policies
if (
originalOutput.type === outputType.Elasticsearch &&
(data?.type === outputType.Logstash || data?.type === outputType.Kafka)
) {
// Validate no policy with fleet server use that policy
validateLogstashOutputNotUsedInFleetServerPolicy(fleetServerPolicies);
validateOutputNotUsedInFleetServerPolicy(fleetServerPolicies, data.type);
}
await updateFleetServerPoliciesDataOutputId(
soClient,
@ -203,7 +228,7 @@ async function updateFleetServerPoliciesDataOutputId(
// if a logstash output is updated to become default
// if fleet server policies don't have data_output_id
// update them to use the default output
if (data?.type === outputType.Logstash && isDefault) {
if ((data?.type === outputType.Logstash || data?.type === outputType.Kafka) && isDefault) {
for (const policy of fleetServerPolicies) {
if (!policy.data_output_id) {
await agentPolicyService.update(
@ -440,6 +465,7 @@ class OutputService {
if (!output.config_yaml && output.shipper) {
data.shipper = null;
}
if (output.config_yaml) {
const configJs = safeLoad(output.config_yaml);
const isShipperDisabled = !configJs?.shipper || configJs?.shipper?.enabled === false;
@ -449,6 +475,54 @@ class OutputService {
}
}
if (output.type === outputType.Kafka && data.type === outputType.Kafka) {
if (!output.version) {
data.version = '1.0.0';
}
if (!output.compression) {
data.compression = kafkaCompressionType.Gzip;
}
if (
!output.compression ||
(output.compression === kafkaCompressionType.Gzip && !output.compression_level)
) {
data.compression_level = 4;
}
if (!output.client_id) {
data.client_id = 'Elastic Agent';
}
if (output.username && output.password && !output.sasl?.mechanism) {
data.sasl = {
mechanism: kafkaSaslMechanism.Plain,
};
}
if (!output.partition) {
data.partition = kafkaPartitionType.Hash;
}
if (output.partition === kafkaPartitionType.Random && !output.random?.group_events) {
data.random = {
group_events: 1,
};
}
if (output.partition === kafkaPartitionType.RoundRobin && !output.round_robin?.group_events) {
data.round_robin = {
group_events: 1,
};
}
if (!output.timeout) {
data.timeout = 30;
}
if (!output.broker_timeout) {
data.broker_timeout = 10;
}
if (!output.broker_ack_reliability) {
data.broker_ack_reliability = kafkaAcknowledgeReliabilityLevel.Commit;
}
if (!output.broker_buffer_size) {
data.broker_buffer_size = 256;
}
}
const id = options?.id ? outputIdToUuid(options.id) : SavedObjectsUtils.generateId();
auditLoggingService.writeCustomSoAuditLog({
@ -629,16 +703,98 @@ class OutputService {
fromPreconfiguration
);
const removeKafkaFields = (target: Nullable<Partial<OutputSoKafkaAttributes>>) => {
target.version = null;
target.key = null;
target.compression = null;
target.compression_level = null;
target.client_id = null;
target.auth_type = null;
target.username = null;
target.password = null;
target.sasl = null;
target.partition = null;
target.random = null;
target.round_robin = null;
target.hash = null;
target.topics = null;
target.headers = null;
target.timeout = null;
target.broker_timeout = null;
target.broker_ack_reliability = null;
target.broker_buffer_size = null;
};
// If the output type changed
if (data.type && data.type !== originalOutput.type) {
if (
(data.type === outputType.Elasticsearch || data.type === outputType.Logstash) &&
originalOutput.type === outputType.Kafka
) {
removeKafkaFields(updateData as Nullable<OutputSoKafkaAttributes>);
}
if (data.type === outputType.Logstash) {
// remove ES specific field
updateData.ca_trusted_fingerprint = null;
updateData.ca_sha256 = null;
} else {
}
if (data.type === outputType.Elasticsearch) {
// remove logstash specific field
updateData.ssl = null;
}
if (data.type === outputType.Kafka && updateData.type === outputType.Kafka) {
updateData.ca_trusted_fingerprint = null;
updateData.ca_sha256 = null;
if (!data.version) {
updateData.version = '1.0.0';
}
if (!data.compression) {
updateData.compression = kafkaCompressionType.Gzip;
}
if (
!data.compression ||
(data.compression === kafkaCompressionType.Gzip && !data.compression_level)
) {
updateData.compression_level = 4;
}
if (!data.client_id) {
updateData.client_id = 'Elastic Agent';
}
if (data.username && data.password && !data.sasl?.mechanism) {
updateData.sasl = {
mechanism: kafkaSaslMechanism.Plain,
};
}
if (!data.partition) {
updateData.partition = kafkaPartitionType.Hash;
}
if (data.partition === kafkaPartitionType.Random && !data.random?.group_events) {
updateData.random = {
group_events: 1,
};
}
if (data.partition === kafkaPartitionType.RoundRobin && !data.round_robin?.group_events) {
updateData.round_robin = {
group_events: 1,
};
}
if (!data.timeout) {
updateData.timeout = 30;
}
if (!data.broker_timeout) {
updateData.broker_timeout = 10;
}
if (!data.broker_ack_reliability) {
updateData.broker_ack_reliability = kafkaAcknowledgeReliabilityLevel.Commit;
}
if (!data.broker_buffer_size) {
updateData.broker_buffer_size = 256;
}
}
}
if (data.ssl) {

View file

@ -7,7 +7,15 @@
import { schema } from '@kbn/config-schema';
import { outputType } from '../../../common/constants';
import {
kafkaAcknowledgeReliabilityLevel,
kafkaAuthType,
kafkaCompressionType,
kafkaPartitionType,
kafkaSaslMechanism,
kafkaTopicWhenType,
outputType,
} from '../../../common/constants';
export function validateLogstashHost(val: string) {
if (val.match(/^http([s]){0,1}:\/\//)) {
@ -25,23 +33,13 @@ export function validateLogstashHost(val: string) {
}
}
const OutputBaseSchema = {
/**
* Base schemas
*/
const BaseSchema = {
id: schema.maybe(schema.string()),
name: schema.string(),
type: schema.oneOf([
schema.literal(outputType.Elasticsearch),
schema.literal(outputType.Logstash),
]),
hosts: schema.conditional(
schema.siblingRef('type'),
schema.literal(outputType.Elasticsearch),
schema.arrayOf(schema.uri({ scheme: ['http', 'https'] }), {
minSize: 1,
}),
schema.arrayOf(schema.string({ validate: validateLogstashHost }), {
minSize: 1,
})
),
is_default: schema.boolean({ defaultValue: false }),
is_default_monitoring: schema.boolean({ defaultValue: false }),
ca_sha256: schema.maybe(schema.string()),
@ -71,49 +69,175 @@ const OutputBaseSchema = {
),
};
export const NewOutputSchema = schema.object({ ...OutputBaseSchema });
export const UpdateOutputSchema = schema.object({
const UpdateSchema = {
...BaseSchema,
name: schema.maybe(schema.string()),
type: schema.maybe(
schema.oneOf([schema.literal(outputType.Elasticsearch), schema.literal(outputType.Logstash)])
),
hosts: schema.maybe(
schema.oneOf([
schema.arrayOf(schema.uri({ scheme: ['http', 'https'] })),
schema.arrayOf(schema.string({ validate: validateLogstashHost })),
])
),
is_default: schema.maybe(schema.boolean()),
is_default_monitoring: schema.maybe(schema.boolean()),
ca_sha256: schema.maybe(schema.string()),
ca_trusted_fingerprint: schema.maybe(schema.string()),
config_yaml: schema.maybe(schema.string()),
ssl: schema.maybe(
schema.object({
certificate_authorities: schema.maybe(schema.arrayOf(schema.string())),
certificate: schema.maybe(schema.string()),
key: schema.maybe(schema.string()),
})
),
proxy_id: schema.nullable(schema.string()),
shipper: schema.maybe(
schema.object({
disk_queue_enabled: schema.nullable(schema.boolean({ defaultValue: false })),
disk_queue_path: schema.nullable(schema.string()),
disk_queue_max_size: schema.nullable(schema.number()),
disk_queue_encryption_enabled: schema.nullable(schema.boolean()),
disk_queue_compression_enabled: schema.nullable(schema.boolean()),
compression_level: schema.nullable(schema.number()),
loadbalance: schema.nullable(schema.boolean()),
mem_queue_events: schema.nullable(schema.number()),
queue_flush_timeout: schema.nullable(schema.number()),
max_batch_bytes: schema.nullable(schema.number()),
})
),
});
};
export const OutputSchema = schema.object({
...OutputBaseSchema,
id: schema.string(),
});
/**
* Elasticsearch schemas
*/
export const ElasticSearchSchema = {
...BaseSchema,
type: schema.literal(outputType.Elasticsearch),
hosts: schema.arrayOf(schema.uri({ scheme: ['http', 'https'] }), { minSize: 1 }),
};
const ElasticSearchUpdateSchema = {
...UpdateSchema,
type: schema.maybe(schema.literal(outputType.Elasticsearch)),
hosts: schema.maybe(schema.arrayOf(schema.uri({ scheme: ['http', 'https'] }), { minSize: 1 })),
};
/**
* Logstash schemas
*/
export const LogstashSchema = {
...BaseSchema,
type: schema.literal(outputType.Logstash),
hosts: schema.arrayOf(schema.string({ validate: validateLogstashHost }), { minSize: 1 }),
};
const LogstashUpdateSchema = {
...UpdateSchema,
type: schema.maybe(schema.literal(outputType.Logstash)),
hosts: schema.maybe(
schema.arrayOf(schema.string({ validate: validateLogstashHost }), { minSize: 1 })
),
};
/**
* Kafka schemas
*/
const KafkaTopicsSchema = schema.arrayOf(
schema.object({
topic: schema.string(),
when: schema.maybe(
schema.object({
type: schema.maybe(
schema.oneOf([
schema.literal(kafkaTopicWhenType.And),
schema.literal(kafkaTopicWhenType.Not),
schema.literal(kafkaTopicWhenType.Or),
schema.literal(kafkaTopicWhenType.Equals),
schema.literal(kafkaTopicWhenType.Contains),
schema.literal(kafkaTopicWhenType.Regexp),
schema.literal(kafkaTopicWhenType.HasFields),
schema.literal(kafkaTopicWhenType.Network),
schema.literal(kafkaTopicWhenType.Range),
])
),
condition: schema.maybe(schema.string()),
})
),
}),
{ minSize: 1 }
);
export const KafkaSchema = {
...BaseSchema,
type: schema.literal(outputType.Kafka),
hosts: schema.arrayOf(schema.uri({ scheme: ['http', 'https'] }), { minSize: 1 }),
version: schema.maybe(schema.string()),
key: schema.maybe(schema.string()),
compression: schema.maybe(
schema.oneOf([
schema.literal(kafkaCompressionType.Gzip),
schema.literal(kafkaCompressionType.Snappy),
schema.literal(kafkaCompressionType.Lz4),
])
),
compression_level: schema.conditional(
schema.siblingRef('compression'),
schema.string(),
schema.number(),
schema.never()
),
client_id: schema.maybe(schema.string()),
auth_type: schema.oneOf([
schema.literal(kafkaAuthType.Userpass),
schema.literal(kafkaAuthType.Ssl),
schema.literal(kafkaAuthType.Kerberos),
]),
username: schema.conditional(
schema.siblingRef('auth_type'),
kafkaAuthType.Userpass,
schema.string(),
schema.never()
),
password: schema.conditional(
schema.siblingRef('username'),
schema.string(),
schema.string(),
schema.never()
),
sasl: schema.maybe(
schema.object({
mechanism: schema.maybe(
schema.oneOf([
schema.literal(kafkaSaslMechanism.Plain),
schema.literal(kafkaSaslMechanism.ScramSha256),
schema.literal(kafkaSaslMechanism.ScramSha512),
])
),
})
),
partition: schema.maybe(
schema.oneOf([
schema.literal(kafkaPartitionType.Random),
schema.literal(kafkaPartitionType.RoundRobin),
schema.literal(kafkaPartitionType.Hash),
])
),
random: schema.maybe(schema.object({ group_events: schema.maybe(schema.number()) })),
round_robin: schema.maybe(schema.object({ group_events: schema.maybe(schema.number()) })),
hash: schema.maybe(
schema.object({ hash: schema.maybe(schema.string()), random: schema.maybe(schema.boolean()) })
),
topics: KafkaTopicsSchema,
headers: schema.maybe(
schema.arrayOf(schema.object({ key: schema.string(), value: schema.string() }))
),
timeout: schema.maybe(schema.number()),
broker_timeout: schema.maybe(schema.number()),
broker_buffer_size: schema.maybe(schema.number()),
broker_ack_reliability: schema.maybe(
schema.oneOf([
schema.literal(kafkaAcknowledgeReliabilityLevel.Commit),
schema.literal(kafkaAcknowledgeReliabilityLevel.Replica),
schema.literal(kafkaAcknowledgeReliabilityLevel.DoNotWait),
])
),
};
const KafkaUpdateSchema = {
...UpdateSchema,
...KafkaSchema,
type: schema.maybe(schema.literal(outputType.Kafka)),
hosts: schema.maybe(schema.arrayOf(schema.uri({ scheme: ['http', 'https'] }), { minSize: 1 })),
auth_type: schema.maybe(
schema.oneOf([
schema.literal(kafkaAuthType.Userpass),
schema.literal(kafkaAuthType.Ssl),
schema.literal(kafkaAuthType.Kerberos),
])
),
topics: schema.maybe(KafkaTopicsSchema),
};
export const OutputSchema = schema.oneOf([
schema.object({ ...ElasticSearchSchema }),
schema.object({ ...LogstashSchema }),
schema.object({ ...KafkaSchema }),
]);
export const UpdateOutputSchema = schema.oneOf([
schema.object({ ...ElasticSearchUpdateSchema }),
schema.object({ ...LogstashUpdateSchema }),
schema.object({ ...KafkaUpdateSchema }),
]);

View file

@ -11,9 +11,10 @@ import semverValid from 'semver/functions/valid';
import { PRECONFIGURATION_LATEST_KEYWORD } from '../../constants';
import type { PreconfiguredOutput } from '../../../common/types';
import { ElasticSearchSchema, KafkaSchema, LogstashSchema } from './output';
import { AgentPolicyBaseSchema } from './agent_policy';
import { NamespaceSchema } from './package_policy';
import { NewOutputSchema } from './output';
const varsSchema = schema.maybe(
schema.arrayOf(
@ -73,17 +74,20 @@ function validatePreconfiguredOutputs(outputs: PreconfiguredOutput[]) {
}
}
const PreconfiguredOutputBaseSchema = {
id: schema.string(),
config: schema.maybe(schema.object({}, { unknowns: 'allow' })),
config_yaml: schema.never(),
allow_edit: schema.maybe(schema.arrayOf(schema.string())),
};
export const PreconfiguredOutputsSchema = schema.arrayOf(
NewOutputSchema.extends({
id: schema.string(),
config: schema.maybe(schema.object({}, { unknowns: 'allow' })),
config_yaml: schema.never(),
allow_edit: schema.maybe(schema.arrayOf(schema.string())),
}),
{
defaultValue: [],
validate: validatePreconfiguredOutputs,
}
schema.oneOf([
schema.object({ ...ElasticSearchSchema }).extends(PreconfiguredOutputBaseSchema),
schema.object({ ...LogstashSchema }).extends(PreconfiguredOutputBaseSchema),
schema.object({ ...KafkaSchema }).extends(PreconfiguredOutputBaseSchema),
]),
{ defaultValue: [], validate: validatePreconfiguredOutputs }
);
export const PreconfiguredFleetServerHostsSchema = schema.arrayOf(

View file

@ -7,7 +7,7 @@
import { schema } from '@kbn/config-schema';
import { NewOutputSchema, UpdateOutputSchema } from '../models';
import { OutputSchema, UpdateOutputSchema } from '../models';
export const GetOneOutputRequestSchema = {
params: schema.object({
@ -24,7 +24,7 @@ export const DeleteOutputRequestSchema = {
export const GetOutputsRequestSchema = {};
export const PostOutputRequestSchema = {
body: NewOutputSchema,
body: OutputSchema,
};
export const PutOutputRequestSchema = {

View file

@ -13,6 +13,7 @@ import type {
AgentMetadata,
OutputType,
ShipperOutput,
KafkaAcknowledgeReliabilityLevel,
} from '../../common/types';
import type { AgentType, FleetServerAgentComponent } from '../../common/types/models';
@ -23,6 +24,12 @@ import type {
PackagePolicyConfigRecord,
} from '../../common/types/models/package_policy';
import type { PolicySecretReference } from '../../common/types/models/secret';
import type { KafkaAuthType, KafkaCompressionType } from '../../common/types';
import type {
KafkaPartitionType,
KafkaSaslMechanism,
KafkaTopicWhenType,
} from '../../common/types';
export type AgentPolicyStatus = typeof agentPolicyStatuses;
@ -120,11 +127,10 @@ export interface PackagePolicySOAttributes {
agents?: number;
}
export interface OutputSOAttributes {
interface OutputSoBaseAttributes {
is_default: boolean;
is_default_monitoring: boolean;
name: string;
type: ValueOf<OutputType>;
hosts?: string[];
ca_sha256?: string | null;
ca_trusted_fingerprint?: string | null;
@ -137,6 +143,60 @@ export interface OutputSOAttributes {
ssl?: string | null; // encrypted ssl field
}
interface OutputSoElasticsearchAttributes extends OutputSoBaseAttributes {
type: OutputType['Elasticsearch'];
}
interface OutputSoLogstashAttributes extends OutputSoBaseAttributes {
type: OutputType['Logstash'];
}
export interface OutputSoKafkaAttributes extends OutputSoBaseAttributes {
type: OutputType['Kafka'];
client_id?: string;
version?: string;
key?: string;
compression?: ValueOf<KafkaCompressionType>;
compression_level?: number;
auth_type?: ValueOf<KafkaAuthType>;
username?: string;
password?: string;
sasl?: {
mechanism?: ValueOf<KafkaSaslMechanism>;
};
partition?: ValueOf<KafkaPartitionType>;
random?: {
group_events?: number;
};
round_robin?: {
group_events?: number;
};
hash?: {
hash?: string;
random?: boolean;
};
topics?: Array<{
topic: string;
when?: {
type?: ValueOf<KafkaTopicWhenType>;
condition?: string;
};
}>;
headers?: Array<{
key: string;
value: string;
}>;
timeout?: number;
broker_timeout?: number;
broker_buffer_size?: number;
broker_ack_reliability?: ValueOf<KafkaAcknowledgeReliabilityLevel>;
}
export type OutputSOAttributes =
| OutputSoElasticsearchAttributes
| OutputSoLogstashAttributes
| OutputSoKafkaAttributes;
export interface SettingsSOAttributes {
prerelease_integrations_enabled: boolean;
has_seen_add_data_notice?: boolean;

View file

@ -190,6 +190,27 @@ export default function (providerContext: FtrProviderContext) {
);
});
it('should not allow to update a default ES output to Kafka', async function () {
const { body } = await supertest
.put(`/api/fleet/outputs/${defaultOutputId}`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'My Kafka Output',
type: 'kafka',
hosts: ['https://test.fr'],
auth_type: 'user_pass',
username: 'user',
password: 'pass',
is_default: true,
is_default_monitoring: true,
topics: [{ topic: 'topic1' }],
})
.expect(400);
expect(body.message).to.eql(
'Kafka output cannot be used with Fleet Server integration in Fleet Server policy 1. Please create a new ElasticSearch output.'
);
});
it('should allow to update a default ES output if keeping it ES', async function () {
await supertest
.put(`/api/fleet/outputs/${defaultOutputId}`)
@ -197,7 +218,7 @@ export default function (providerContext: FtrProviderContext) {
.send({
name: 'Updated Default ES Output',
type: 'elasticsearch',
hosts: ['test.fr:443'],
hosts: ['http://test.fr:443'],
})
.expect(200);
});
@ -245,6 +266,38 @@ export default function (providerContext: FtrProviderContext) {
expect(updatedFleetServerPolicyWithCustomOutput.data_output_id === ESOutputId);
});
it('should allow to update a non-default ES output to kafka', async function () {
const { body: postResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'Elasticsearch output',
type: 'elasticsearch',
hosts: ['https://test.fr:443'],
})
.expect(200);
const { id: elasticsearchOutputId } = postResponse.item;
await supertest
.put(`/api/fleet/outputs/${elasticsearchOutputId}`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'A Kafka Output',
type: 'kafka',
})
.expect(200);
const { body } = await supertest.get(`/api/fleet/agent_policies/${fleetServerPolicyId}`);
const updatedFleetServerPolicy = body.item;
expect(updatedFleetServerPolicy.data_output_id === defaultOutputId);
const { body: bodyWithOutput } = await supertest.get(
`/api/fleet/agent_policies/${fleetServerPolicyWithCustomOutputId}`
);
const updatedFleetServerPolicyWithCustomOutput = bodyWithOutput.item;
expect(updatedFleetServerPolicyWithCustomOutput.data_output_id === ESOutputId);
});
it('should allow to update a default logstash output to logstash and fleet server policies should be updated', async function () {
const { body: postResponse } = await supertest
.post(`/api/fleet/outputs`)
@ -505,6 +558,99 @@ export default function (providerContext: FtrProviderContext) {
);
});
it('should allow to create a new kafka output', async function () {
const { body: postResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'My Kafka Output',
type: 'kafka',
hosts: ['https://test.fr'],
auth_type: 'user_pass',
username: 'user',
password: 'pass',
topics: [{ topic: 'topic1' }],
})
.expect(200);
const { id: _, ...itemWithoutId } = postResponse.item;
expect(itemWithoutId).to.eql({
is_default: false,
is_default_monitoring: false,
name: 'My Kafka Output',
type: 'kafka',
hosts: ['https://test.fr'],
auth_type: 'user_pass',
username: 'user',
password: 'pass',
topics: [{ topic: 'topic1' }],
broker_timeout: 10,
broker_ack_reliability: 'Wait for local commit',
broker_buffer_size: 256,
client_id: 'Elastic Agent',
compression: 'gzip',
compression_level: 4,
sasl: {
mechanism: 'PLAIN',
},
timeout: 30,
partition: 'hash',
version: '1.0.0',
});
});
it('should allow to create a new kafka default output and fleet server policies should not change', async function () {
const { body: postResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'Default Kafka Output',
type: 'kafka',
hosts: ['https://test.fr'],
auth_type: 'user_pass',
username: 'user',
password: 'pass',
topics: [{ topic: 'topic1' }],
is_default: true,
})
.expect(200);
const { id: _, ...itemWithoutId } = postResponse.item;
expect(itemWithoutId).to.eql({
name: 'Default Kafka Output',
type: 'kafka',
hosts: ['https://test.fr'],
auth_type: 'user_pass',
username: 'user',
password: 'pass',
topics: [{ topic: 'topic1' }],
is_default: true,
is_default_monitoring: false,
broker_timeout: 10,
broker_ack_reliability: 'Wait for local commit',
broker_buffer_size: 256,
client_id: 'Elastic Agent',
compression: 'gzip',
compression_level: 4,
sasl: {
mechanism: 'PLAIN',
},
timeout: 30,
partition: 'hash',
version: '1.0.0',
});
const { body } = await supertest.get(`/api/fleet/agent_policies/${fleetServerPolicyId}`);
const updatedFleetServerPolicy = body.item;
expect(updatedFleetServerPolicy.data_output_id === defaultOutputId);
const { body: bodyWithOutput } = await supertest.get(
`/api/fleet/agent_policies/${fleetServerPolicyWithCustomOutputId}`
);
const updatedFleetServerPolicyWithCustomOutput = bodyWithOutput.item;
expect(updatedFleetServerPolicyWithCustomOutput.data_output_id === ESOutputId);
});
it('should toggle the default output when creating a new one', async function () {
await supertest
.post(`/api/fleet/outputs`)
@ -693,77 +839,166 @@ export default function (providerContext: FtrProviderContext) {
const defaultOutputs = outputs.filter((o: any) => o.is_default_monitoring);
expect(defaultOutputs[0].shipper).to.equal(null);
});
it('should allow to create a kafka output with the shipper values', async function () {
await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'Kafka Output',
type: 'kafka',
hosts: ['https://test.fr'],
auth_type: 'user_pass',
username: 'user',
password: 'pass',
topics: [{ topic: 'topic1' }],
config_yaml: 'shipper: {}',
shipper: {
disk_queue_enabled: true,
disk_queue_path: 'path/to/disk/queue',
disk_queue_encryption_enabled: true,
},
})
.expect(200);
const {
body: { items: outputs },
} = await supertest.get(`/api/fleet/outputs`).expect(200);
const newOutput = outputs.filter((o: any) => o.name === 'Kafka Output');
expect(newOutput[0].shipper).to.eql({
compression_level: null,
disk_queue_compression_enabled: null,
disk_queue_enabled: true,
disk_queue_encryption_enabled: true,
disk_queue_max_size: null,
disk_queue_path: 'path/to/disk/queue',
loadbalance: null,
max_batch_bytes: null,
mem_queue_events: null,
queue_flush_timeout: null,
});
});
});
describe('DELETE /outputs/{outputId}', () => {
let outputId: string;
let defaultOutputIdToDelete: string;
let defaultMonitoringOutputId: string;
describe('Elasticsearch output', () => {
let outputId: string;
let defaultOutputIdToDelete: string;
let defaultMonitoringOutputId: string;
before(async () => {
const { body: postResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'Output to delete test',
type: 'elasticsearch',
hosts: ['https://test.fr'],
})
.expect(200);
outputId = postResponse.item.id;
before(async () => {
const { body: postResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'Output to delete test',
type: 'elasticsearch',
hosts: ['https://test.fr'],
})
.expect(200);
outputId = postResponse.item.id;
const { body: defaultOutputPostResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'Default Output to delete test',
type: 'elasticsearch',
hosts: ['https://test.fr'],
is_default: true,
})
.expect(200);
defaultOutputIdToDelete = defaultOutputPostResponse.item.id;
const { body: defaultMonitoringOutputPostResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'Default Output to delete test',
type: 'elasticsearch',
hosts: ['https://test.fr'],
is_default_monitoring: true,
})
.expect(200);
defaultMonitoringOutputId = defaultMonitoringOutputPostResponse.item.id;
const { body: defaultOutputPostResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'Default Output to delete test',
type: 'elasticsearch',
hosts: ['https://test.fr'],
is_default: true,
})
.expect(200);
defaultOutputIdToDelete = defaultOutputPostResponse.item.id;
const { body: defaultMonitoringOutputPostResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({
name: 'Default Output to delete test',
type: 'elasticsearch',
hosts: ['https://test.fr'],
is_default_monitoring: true,
})
.expect(200);
defaultMonitoringOutputId = defaultMonitoringOutputPostResponse.item.id;
});
it('should return a 400 when deleting a default output ', async function () {
await supertest
.delete(`/api/fleet/outputs/${defaultOutputIdToDelete}`)
.set('kbn-xsrf', 'xxxx')
.expect(400);
});
it('should return a 400 when deleting a default output ', async function () {
await supertest
.delete(`/api/fleet/outputs/${defaultMonitoringOutputId}`)
.set('kbn-xsrf', 'xxxx')
.expect(400);
});
it('should return a 404 when deleting a non existing output ', async function () {
await supertest
.delete(`/api/fleet/outputs/idonotexists`)
.set('kbn-xsrf', 'xxxx')
.expect(404);
});
it('should allow to delete an output ', async function () {
const { body: deleteResponse } = await supertest
.delete(`/api/fleet/outputs/${outputId}`)
.set('kbn-xsrf', 'xxxx')
.expect(200);
expect(deleteResponse.id).to.eql(outputId);
});
});
it('should return a 400 when deleting a default output ', async function () {
await supertest
.delete(`/api/fleet/outputs/${defaultOutputIdToDelete}`)
.set('kbn-xsrf', 'xxxx')
.expect(400);
});
describe('Kafka output', () => {
let outputId: string;
let defaultOutputIdToDelete: string;
it('should return a 400 when deleting a default output ', async function () {
await supertest
.delete(`/api/fleet/outputs/${defaultMonitoringOutputId}`)
.set('kbn-xsrf', 'xxxx')
.expect(400);
});
const kafkaOutputPayload = {
name: 'Output to delete test',
type: 'kafka',
hosts: ['https://test.fr'],
auth_type: 'user_pass',
username: 'user',
password: 'pass',
is_default: true,
topics: [{ topic: 'topic1' }],
};
it('should return a 404 when deleting a non existing output ', async function () {
await supertest
.delete(`/api/fleet/outputs/idonotexists`)
.set('kbn-xsrf', 'xxxx')
.expect(404);
});
before(async () => {
const { body: postResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send(kafkaOutputPayload)
.expect(200);
outputId = postResponse.item.id;
it('should allow to delete an output ', async function () {
const { body: deleteResponse } = await supertest
.delete(`/api/fleet/outputs/${outputId}`)
.set('kbn-xsrf', 'xxxx')
.expect(200);
const { body: defaultOutputPostResponse } = await supertest
.post(`/api/fleet/outputs`)
.set('kbn-xsrf', 'xxxx')
.send({ ...kafkaOutputPayload, name: 'Default Output to delete test' })
.expect(200);
defaultOutputIdToDelete = defaultOutputPostResponse.item.id;
});
expect(deleteResponse.id).to.eql(outputId);
it('should return a 400 when deleting a default output ', async function () {
await supertest
.delete(`/api/fleet/outputs/${defaultOutputIdToDelete}`)
.set('kbn-xsrf', 'xxxx')
.expect(400);
});
it('should allow to delete an output ', async function () {
const { body: deleteResponse } = await supertest
.delete(`/api/fleet/outputs/${outputId}`)
.set('kbn-xsrf', 'xxxx')
.expect(200);
expect(deleteResponse.id).to.eql(outputId);
});
});
});
});