feat(streams): add significant events and queries API (#216221)

This commit is contained in:
Kevin Delemme 2025-04-07 08:10:25 -04:00 committed by GitHub
parent 6ef920de90
commit e71ea24e0f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
51 changed files with 3168 additions and 357 deletions

View file

@ -46684,10 +46684,56 @@
"type": "string"
},
"type": "array"
},
"queries": {
"items": {
"allOf": [
{
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"id",
"title"
],
"type": "object"
},
{
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
}
},
"required": [
"kql"
],
"type": "object"
}
]
},
"type": "array"
}
},
"required": [
"dashboards"
"dashboards",
"queries"
],
"type": "object"
},
@ -48405,10 +48451,56 @@
"type": "string"
},
"type": "array"
},
"queries": {
"items": {
"allOf": [
{
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"id",
"title"
],
"type": "object"
},
{
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
}
},
"required": [
"kql"
],
"type": "object"
}
]
},
"type": "array"
}
},
"required": [
"dashboards"
"dashboards",
"queries"
],
"type": "object"
},
@ -49875,10 +49967,56 @@
"type": "string"
},
"type": "array"
},
"queries": {
"items": {
"allOf": [
{
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"id",
"title"
],
"type": "object"
},
{
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
}
},
"required": [
"kql"
],
"type": "object"
}
]
},
"type": "array"
}
},
"required": [
"dashboards"
"dashboards",
"queries"
],
"type": "object"
},
@ -53818,6 +53956,385 @@
],
"x-state": "Technical Preview"
}
},
"/api/streams/{name}/queries": {
"get": {
"description": "Fetches all queries linked to a stream that are visible to the current user in the current space.",
"operationId": "get-streams-name-queries",
"parameters": [
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"anyOf": [
{
"additionalProperties": false,
"properties": {},
"type": "object"
},
{
"enum": [
"null"
],
"nullable": true
},
{
"not": {}
}
]
}
}
}
},
"responses": {},
"summary": "Get stream queries",
"tags": [
"streams"
],
"x-state": "Technical Preview"
}
},
"/api/streams/{name}/queries/_bulk": {
"post": {
"description": "Bulk update queries of a stream. Can add new queries and delete existing ones.",
"operationId": "post-streams-name-queries-bulk",
"parameters": [
{
"description": "A required header to protect against CSRF attacks",
"in": "header",
"name": "kbn-xsrf",
"required": true,
"schema": {
"example": "true",
"type": "string"
}
},
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"additionalProperties": false,
"properties": {
"operations": {
"items": {
"anyOf": [
{
"additionalProperties": false,
"properties": {
"index": {
"allOf": [
{
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"id",
"title"
],
"type": "object"
},
{
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
}
},
"required": [
"kql"
],
"type": "object"
}
]
}
},
"required": [
"index"
],
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"delete": {
"additionalProperties": false,
"properties": {
"id": {
"type": "string"
}
},
"required": [
"id"
],
"type": "object"
}
},
"required": [
"delete"
],
"type": "object"
}
]
},
"type": "array"
}
},
"required": [
"operations"
],
"type": "object"
}
}
}
},
"responses": {},
"summary": "Bulk update queries",
"tags": [
"streams"
],
"x-state": "Technical Preview"
}
},
"/api/streams/{name}/queries/{queryId}": {
"delete": {
"description": "Remove a query from a stream. Noop if the query is not found on the stream.",
"operationId": "delete-streams-name-queries-queryid",
"parameters": [
{
"description": "A required header to protect against CSRF attacks",
"in": "header",
"name": "kbn-xsrf",
"required": true,
"schema": {
"example": "true",
"type": "string"
}
},
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
},
{
"in": "path",
"name": "queryId",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"anyOf": [
{
"additionalProperties": false,
"properties": {},
"type": "object"
},
{
"enum": [
"null"
],
"nullable": true
},
{
"not": {}
}
]
}
}
}
},
"responses": {},
"summary": "Remove a query from a stream",
"tags": [
"streams"
],
"x-state": "Technical Preview"
},
"put": {
"description": "Adds a query to a stream. Noop if the query is already present on the stream.",
"operationId": "put-streams-name-queries-queryid",
"parameters": [
{
"description": "A required header to protect against CSRF attacks",
"in": "header",
"name": "kbn-xsrf",
"required": true,
"schema": {
"example": "true",
"type": "string"
}
},
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
},
{
"in": "path",
"name": "queryId",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"additionalProperties": false,
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"title",
"kql"
],
"type": "object"
}
}
}
},
"responses": {},
"summary": "Upsert a query to a stream",
"tags": [
"streams"
],
"x-state": "Technical Preview"
}
},
"/api/streams/{name}/significant_events": {
"get": {
"description": "Read the significant events",
"operationId": "get-streams-name-significant-events",
"parameters": [
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
},
{
"in": "query",
"name": "from",
"required": true,
"schema": {
"format": "date-time",
"type": "string"
}
},
{
"in": "query",
"name": "to",
"required": true,
"schema": {
"format": "date-time",
"type": "string"
}
},
{
"in": "query",
"name": "bucketSize",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"anyOf": [
{
"additionalProperties": false,
"properties": {},
"type": "object"
},
{
"enum": [
"null"
],
"nullable": true
},
{
"not": {}
}
]
}
}
}
},
"responses": {},
"summary": "Read the significant events",
"tags": [
"streams"
],
"x-state": "Technical Preview"
}
}
},
"security": [

View file

@ -46275,10 +46275,56 @@
"type": "string"
},
"type": "array"
},
"queries": {
"items": {
"allOf": [
{
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"id",
"title"
],
"type": "object"
},
{
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
}
},
"required": [
"kql"
],
"type": "object"
}
]
},
"type": "array"
}
},
"required": [
"dashboards"
"dashboards",
"queries"
],
"type": "object"
},
@ -47996,10 +48042,56 @@
"type": "string"
},
"type": "array"
},
"queries": {
"items": {
"allOf": [
{
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"id",
"title"
],
"type": "object"
},
{
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
}
},
"required": [
"kql"
],
"type": "object"
}
]
},
"type": "array"
}
},
"required": [
"dashboards"
"dashboards",
"queries"
],
"type": "object"
},
@ -49466,10 +49558,56 @@
"type": "string"
},
"type": "array"
},
"queries": {
"items": {
"allOf": [
{
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"id",
"title"
],
"type": "object"
},
{
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
}
},
"required": [
"kql"
],
"type": "object"
}
]
},
"type": "array"
}
},
"required": [
"dashboards"
"dashboards",
"queries"
],
"type": "object"
},
@ -53409,6 +53547,385 @@
],
"x-state": "Technical Preview"
}
},
"/api/streams/{name}/queries": {
"get": {
"description": "Fetches all queries linked to a stream that are visible to the current user in the current space.",
"operationId": "get-streams-name-queries",
"parameters": [
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"anyOf": [
{
"additionalProperties": false,
"properties": {},
"type": "object"
},
{
"enum": [
"null"
],
"nullable": true
},
{
"not": {}
}
]
}
}
}
},
"responses": {},
"summary": "Get stream queries",
"tags": [
"streams"
],
"x-state": "Technical Preview"
}
},
"/api/streams/{name}/queries/_bulk": {
"post": {
"description": "Bulk update queries of a stream. Can add new queries and delete existing ones.",
"operationId": "post-streams-name-queries-bulk",
"parameters": [
{
"description": "A required header to protect against CSRF attacks",
"in": "header",
"name": "kbn-xsrf",
"required": true,
"schema": {
"example": "true",
"type": "string"
}
},
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"additionalProperties": false,
"properties": {
"operations": {
"items": {
"anyOf": [
{
"additionalProperties": false,
"properties": {
"index": {
"allOf": [
{
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"id",
"title"
],
"type": "object"
},
{
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
}
},
"required": [
"kql"
],
"type": "object"
}
]
}
},
"required": [
"index"
],
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"delete": {
"additionalProperties": false,
"properties": {
"id": {
"type": "string"
}
},
"required": [
"id"
],
"type": "object"
}
},
"required": [
"delete"
],
"type": "object"
}
]
},
"type": "array"
}
},
"required": [
"operations"
],
"type": "object"
}
}
}
},
"responses": {},
"summary": "Bulk update queries",
"tags": [
"streams"
],
"x-state": "Technical Preview"
}
},
"/api/streams/{name}/queries/{queryId}": {
"delete": {
"description": "Remove a query from a stream. Noop if the query is not found on the stream.",
"operationId": "delete-streams-name-queries-queryid",
"parameters": [
{
"description": "A required header to protect against CSRF attacks",
"in": "header",
"name": "kbn-xsrf",
"required": true,
"schema": {
"example": "true",
"type": "string"
}
},
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
},
{
"in": "path",
"name": "queryId",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"anyOf": [
{
"additionalProperties": false,
"properties": {},
"type": "object"
},
{
"enum": [
"null"
],
"nullable": true
},
{
"not": {}
}
]
}
}
}
},
"responses": {},
"summary": "Remove a query from a stream",
"tags": [
"streams"
],
"x-state": "Technical Preview"
},
"put": {
"description": "Adds a query to a stream. Noop if the query is already present on the stream.",
"operationId": "put-streams-name-queries-queryid",
"parameters": [
{
"description": "A required header to protect against CSRF attacks",
"in": "header",
"name": "kbn-xsrf",
"required": true,
"schema": {
"example": "true",
"type": "string"
}
},
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
},
{
"in": "path",
"name": "queryId",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"additionalProperties": false,
"properties": {
"kql": {
"additionalProperties": false,
"properties": {
"query": {
"minLength": 1,
"type": "string"
}
},
"required": [
"query"
],
"type": "object"
},
"title": {
"minLength": 1,
"type": "string"
}
},
"required": [
"title",
"kql"
],
"type": "object"
}
}
}
},
"responses": {},
"summary": "Upsert a query to a stream",
"tags": [
"streams"
],
"x-state": "Technical Preview"
}
},
"/api/streams/{name}/significant_events": {
"get": {
"description": "Read the significant events",
"operationId": "get-streams-name-significant-events",
"parameters": [
{
"in": "path",
"name": "name",
"required": true,
"schema": {
"type": "string"
}
},
{
"in": "query",
"name": "from",
"required": true,
"schema": {
"format": "date-time",
"type": "string"
}
},
{
"in": "query",
"name": "to",
"required": true,
"schema": {
"format": "date-time",
"type": "string"
}
},
{
"in": "query",
"name": "bucketSize",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"anyOf": [
{
"additionalProperties": false,
"properties": {},
"type": "object"
},
{
"enum": [
"null"
],
"nullable": true
},
{
"not": {}
}
]
}
}
}
},
"responses": {},
"summary": "Read the significant events",
"tags": [
"streams"
],
"x-state": "Technical Preview"
}
}
},
"security": [

View file

@ -43191,8 +43191,37 @@ paths:
minLength: 1
type: string
type: array
queries:
items:
allOf:
- type: object
properties:
id:
minLength: 1
type: string
title:
minLength: 1
type: string
required:
- id
- title
- type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
required:
- kql
type: array
required:
- dashboards
- queries
- type: object
properties:
stream:
@ -44251,8 +44280,37 @@ paths:
minLength: 1
type: string
type: array
queries:
items:
allOf:
- type: object
properties:
id:
minLength: 1
type: string
title:
minLength: 1
type: string
required:
- id
- title
- type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
required:
- kql
type: array
required:
- dashboards
- queries
- type: object
properties:
stream:
@ -45164,8 +45222,37 @@ paths:
minLength: 1
type: string
type: array
queries:
items:
allOf:
- type: object
properties:
id:
minLength: 1
type: string
title:
minLength: 1
type: string
required:
- id
- title
- type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
required:
- kql
type: array
required:
- dashboards
- queries
- type: object
properties:
stream:
@ -47607,6 +47694,243 @@ paths:
tags:
- streams
x-state: Technical Preview
/api/streams/{name}/queries:
get:
description: Fetches all queries linked to a stream that are visible to the current user in the current space.
operationId: get-streams-name-queries
parameters:
- in: path
name: name
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
anyOf:
- additionalProperties: false
type: object
properties: {}
- enum:
- 'null'
nullable: true
- not: {}
responses: {}
summary: Get stream queries
tags:
- streams
x-state: Technical Preview
/api/streams/{name}/queries/_bulk:
post:
description: Bulk update queries of a stream. Can add new queries and delete existing ones.
operationId: post-streams-name-queries-bulk
parameters:
- description: A required header to protect against CSRF attacks
in: header
name: kbn-xsrf
required: true
schema:
example: 'true'
type: string
- in: path
name: name
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
additionalProperties: false
type: object
properties:
operations:
items:
anyOf:
- additionalProperties: false
type: object
properties:
index:
allOf:
- type: object
properties:
id:
minLength: 1
type: string
title:
minLength: 1
type: string
required:
- id
- title
- type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
required:
- kql
required:
- index
- additionalProperties: false
type: object
properties:
delete:
additionalProperties: false
type: object
properties:
id:
type: string
required:
- id
required:
- delete
type: array
required:
- operations
responses: {}
summary: Bulk update queries
tags:
- streams
x-state: Technical Preview
/api/streams/{name}/queries/{queryId}:
delete:
description: Remove a query from a stream. Noop if the query is not found on the stream.
operationId: delete-streams-name-queries-queryid
parameters:
- description: A required header to protect against CSRF attacks
in: header
name: kbn-xsrf
required: true
schema:
example: 'true'
type: string
- in: path
name: name
required: true
schema:
type: string
- in: path
name: queryId
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
anyOf:
- additionalProperties: false
type: object
properties: {}
- enum:
- 'null'
nullable: true
- not: {}
responses: {}
summary: Remove a query from a stream
tags:
- streams
x-state: Technical Preview
put:
description: Adds a query to a stream. Noop if the query is already present on the stream.
operationId: put-streams-name-queries-queryid
parameters:
- description: A required header to protect against CSRF attacks
in: header
name: kbn-xsrf
required: true
schema:
example: 'true'
type: string
- in: path
name: name
required: true
schema:
type: string
- in: path
name: queryId
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
additionalProperties: false
type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
title:
minLength: 1
type: string
required:
- title
- kql
responses: {}
summary: Upsert a query to a stream
tags:
- streams
x-state: Technical Preview
/api/streams/{name}/significant_events:
get:
description: Read the significant events
operationId: get-streams-name-significant-events
parameters:
- in: path
name: name
required: true
schema:
type: string
- in: query
name: from
required: true
schema:
format: date-time
type: string
- in: query
name: to
required: true
schema:
format: date-time
type: string
- in: query
name: bucketSize
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
anyOf:
- additionalProperties: false
type: object
properties: {}
- enum:
- 'null'
nullable: true
- not: {}
responses: {}
summary: Read the significant events
tags:
- streams
x-state: Technical Preview
/api/task_manager/_health:
get:
description: |

View file

@ -46708,8 +46708,37 @@ paths:
minLength: 1
type: string
type: array
queries:
items:
allOf:
- type: object
properties:
id:
minLength: 1
type: string
title:
minLength: 1
type: string
required:
- id
- title
- type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
required:
- kql
type: array
required:
- dashboards
- queries
- type: object
properties:
stream:
@ -47768,8 +47797,37 @@ paths:
minLength: 1
type: string
type: array
queries:
items:
allOf:
- type: object
properties:
id:
minLength: 1
type: string
title:
minLength: 1
type: string
required:
- id
- title
- type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
required:
- kql
type: array
required:
- dashboards
- queries
- type: object
properties:
stream:
@ -48681,8 +48739,37 @@ paths:
minLength: 1
type: string
type: array
queries:
items:
allOf:
- type: object
properties:
id:
minLength: 1
type: string
title:
minLength: 1
type: string
required:
- id
- title
- type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
required:
- kql
type: array
required:
- dashboards
- queries
- type: object
properties:
stream:
@ -51124,6 +51211,243 @@ paths:
tags:
- streams
x-state: Technical Preview
/api/streams/{name}/queries:
get:
description: Fetches all queries linked to a stream that are visible to the current user in the current space.
operationId: get-streams-name-queries
parameters:
- in: path
name: name
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
anyOf:
- additionalProperties: false
type: object
properties: {}
- enum:
- 'null'
nullable: true
- not: {}
responses: {}
summary: Get stream queries
tags:
- streams
x-state: Technical Preview
/api/streams/{name}/queries/_bulk:
post:
description: Bulk update queries of a stream. Can add new queries and delete existing ones.
operationId: post-streams-name-queries-bulk
parameters:
- description: A required header to protect against CSRF attacks
in: header
name: kbn-xsrf
required: true
schema:
example: 'true'
type: string
- in: path
name: name
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
additionalProperties: false
type: object
properties:
operations:
items:
anyOf:
- additionalProperties: false
type: object
properties:
index:
allOf:
- type: object
properties:
id:
minLength: 1
type: string
title:
minLength: 1
type: string
required:
- id
- title
- type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
required:
- kql
required:
- index
- additionalProperties: false
type: object
properties:
delete:
additionalProperties: false
type: object
properties:
id:
type: string
required:
- id
required:
- delete
type: array
required:
- operations
responses: {}
summary: Bulk update queries
tags:
- streams
x-state: Technical Preview
/api/streams/{name}/queries/{queryId}:
delete:
description: Remove a query from a stream. Noop if the query is not found on the stream.
operationId: delete-streams-name-queries-queryid
parameters:
- description: A required header to protect against CSRF attacks
in: header
name: kbn-xsrf
required: true
schema:
example: 'true'
type: string
- in: path
name: name
required: true
schema:
type: string
- in: path
name: queryId
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
anyOf:
- additionalProperties: false
type: object
properties: {}
- enum:
- 'null'
nullable: true
- not: {}
responses: {}
summary: Remove a query from a stream
tags:
- streams
x-state: Technical Preview
put:
description: Adds a query to a stream. Noop if the query is already present on the stream.
operationId: put-streams-name-queries-queryid
parameters:
- description: A required header to protect against CSRF attacks
in: header
name: kbn-xsrf
required: true
schema:
example: 'true'
type: string
- in: path
name: name
required: true
schema:
type: string
- in: path
name: queryId
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
additionalProperties: false
type: object
properties:
kql:
additionalProperties: false
type: object
properties:
query:
minLength: 1
type: string
required:
- query
title:
minLength: 1
type: string
required:
- title
- kql
responses: {}
summary: Upsert a query to a stream
tags:
- streams
x-state: Technical Preview
/api/streams/{name}/significant_events:
get:
description: Read the significant events
operationId: get-streams-name-significant-events
parameters:
- in: path
name: name
required: true
schema:
type: string
- in: query
name: from
required: true
schema:
format: date-time
type: string
- in: query
name: to
required: true
schema:
format: date-time
type: string
- in: query
name: bucketSize
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
anyOf:
- additionalProperties: false
type: object
properties: {}
- enum:
- 'null'
nullable: true
- not: {}
responses: {}
summary: Read the significant events
tags:
- streams
x-state: Technical Preview
/api/task_manager/_health:
get:
description: |

View file

@ -135,24 +135,24 @@ type Exact<T, U> = T extends U
// The IStorageClient type then checks if the application type is a subset of the storage
// document type. If this is not the case, the IStorageClient type is set to never, which
// will cause a type error in the consuming code.
export type IStorageClient<TSchema extends IndexStorageSettings, TApplicationType> = Exact<
ApplicationDocument<TApplicationType>,
Partial<StorageDocumentOf<TSchema>>
> extends true
? InternalIStorageClient<ApplicationDocument<TApplicationType>>
export type IStorageClient<
TSchema extends IndexStorageSettings,
TApplicationType extends StorageDocumentOf<TSchema>
> = Exact<TApplicationType, StorageDocumentOf<TSchema>> extends true
? InternalIStorageClient<TApplicationType>
: never;
export type SimpleIStorageClient<TStorageSettings extends IndexStorageSettings> = IStorageClient<
TStorageSettings,
Omit<StorageDocumentOf<TStorageSettings>, '_id'>
StorageDocumentOf<TStorageSettings>
>;
export type ApplicationDocument<TApplicationType> = TApplicationType & { _id: string };
export type StorageDocumentOf<TStorageSettings extends StorageSettings> = StorageFieldTypeOf<{
type: 'object';
properties: TStorageSettings['schema']['properties'];
}> & { _id: string };
export type StorageDocumentOf<TStorageSettings extends StorageSettings> = Partial<
StorageFieldTypeOf<{
type: 'object';
properties: TStorageSettings['schema']['properties'];
}>
>;
export { StorageIndexAdapter } from './src/index_adapter';

View file

@ -35,7 +35,6 @@ import {
StorageClientSearchResponse,
StorageClientClean,
StorageClientCleanResponse,
ApplicationDocument,
InternalIStorageClient,
} from '../..';
import { getSchemaVersion } from '../get_schema_version';
@ -97,7 +96,10 @@ function wrapEsCall<T>(p: Promise<T>): Promise<T> {
* - Index Lifecycle Management
* - Schema upgrades w/ fallbacks
*/
export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings, TApplicationType> {
export class StorageIndexAdapter<
TStorageSettings extends IndexStorageSettings,
TApplicationType extends Partial<StorageDocumentOf<TStorageSettings>>
> {
private readonly logger: Logger;
constructor(
private readonly esClient: ElasticsearchClient,
@ -320,7 +322,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
return [];
}
private search: StorageClientSearch<ApplicationDocument<TApplicationType>> = async (request) => {
private search: StorageClientSearch<TApplicationType> = async (request) => {
return (await wrapEsCall(
this.esClient
.search({
@ -328,7 +330,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
index: this.getSearchIndexPattern(),
allow_no_indices: true,
})
.catch((error): StorageClientSearchResponse<StorageDocumentOf<TStorageSettings>, any> => {
.catch((error): StorageClientSearchResponse<TApplicationType, any> => {
if (isNotFoundError(error)) {
return {
_shards: {
@ -349,10 +351,10 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
}
throw error;
})
)) as unknown as ReturnType<StorageClientSearch<ApplicationDocument<TApplicationType>>>;
)) as unknown as ReturnType<StorageClientSearch<TApplicationType>>;
};
private index: StorageClientIndex<ApplicationDocument<TApplicationType>> = async ({
private index: StorageClientIndex<TApplicationType> = async ({
id,
refresh = 'wait_for',
...request
@ -391,7 +393,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
});
};
private bulk: StorageClientBulk<ApplicationDocument<TApplicationType>> = ({
private bulk: StorageClientBulk<TApplicationType> = ({
operations,
refresh = 'wait_for',
...request
@ -522,10 +524,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
return { acknowledged: true, result: 'not_found' };
};
private get: StorageClientGet<ApplicationDocument<TApplicationType>> = async ({
id,
...request
}) => {
private get: StorageClientGet<TApplicationType> = async ({ id, ...request }) => {
const response = await this.search({
track_total_hits: false,
size: 1,
@ -566,7 +565,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
_id: hit._id!,
_index: hit._index,
found: true,
_source: hit._source as ApplicationDocument<TApplicationType>,
_source: hit._source as TApplicationType,
_ignored: hit._ignored,
_primary_term: hit._primary_term,
_routing: hit._routing,
@ -582,7 +581,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
});
};
getClient(): InternalIStorageClient<ApplicationDocument<TApplicationType>> {
getClient(): InternalIStorageClient<TApplicationType> {
return {
bulk: this.bulk,
delete: this.delete,
@ -596,4 +595,4 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
}
export type SimpleStorageIndexAdapter<TStorageSettings extends IndexStorageSettings> =
StorageIndexAdapter<TStorageSettings, Omit<StorageDocumentOf<TStorageSettings>, '_id'>>;
StorageIndexAdapter<TStorageSettings, StorageDocumentOf<TStorageSettings>>;

View file

@ -33,6 +33,7 @@ const primitiveTypes = [
ZodFirstPartyTypeKind.ZodLiteral,
ZodFirstPartyTypeKind.ZodEnum,
ZodFirstPartyTypeKind.ZodNativeEnum,
ZodFirstPartyTypeKind.ZodDate,
] as const;
/**
@ -56,6 +57,7 @@ const typeNames = [
ZodFirstPartyTypeKind.ZodDefault,
ZodFirstPartyTypeKind.ZodLazy,
ZodFirstPartyTypeKind.ZodNullable,
ZodFirstPartyTypeKind.ZodPipeline,
...primitiveTypes,
...dangerousTypes,
] as const;
@ -235,6 +237,9 @@ function getHandlingSchemas(schema: z.Schema, key: string, value: object): z.Sch
case ZodFirstPartyTypeKind.ZodNullable:
return [def.innerType];
case ZodFirstPartyTypeKind.ZodPipeline:
return [def.in];
}
}

View file

@ -5,21 +5,70 @@
* 2.0.
*/
import { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { z } from '@kbn/zod';
import { NonEmptyString } from '@kbn/zod-helpers';
import { primitive } from '../record_types';
import { createIsNarrowSchema } from '../../helpers';
interface StreamQueryBase {
id: string;
title: string;
}
export interface StreamQueryKql extends StreamQueryBase {
kql: {
query: string;
};
}
export type StreamQuery = StreamQueryKql;
export interface StreamGetResponseBase {
dashboards: string[];
queries: StreamQuery[];
}
export interface StreamUpsertRequestBase {
dashboards: string[];
queries: StreamQuery[];
}
const streamQueryBaseSchema: z.Schema<StreamQueryBase> = z.object({
id: NonEmptyString,
title: NonEmptyString,
});
export const streamQueryKqlSchema: z.Schema<StreamQueryKql> = z.intersection(
streamQueryBaseSchema,
z.object({
kql: z.object({
query: NonEmptyString,
}),
})
);
export const querySchema: z.ZodType<QueryDslQueryContainer> = z.lazy(() =>
z.record(z.union([primitive, z.array(z.union([primitive, querySchema])), querySchema]))
);
export const streamQuerySchema: z.Schema<StreamQuery> = streamQueryKqlSchema;
export const upsertStreamQueryRequestSchema = z.object({
title: NonEmptyString,
kql: z.object({
query: NonEmptyString,
}),
});
export const isStreamQueryKql = createIsNarrowSchema(streamQuerySchema, streamQueryKqlSchema);
export const streamUpsertRequestSchemaBase: z.Schema<StreamUpsertRequestBase> = z.object({
dashboards: z.array(NonEmptyString),
queries: z.array(streamQuerySchema),
});
export const streamGetResponseSchemaBase: z.Schema<StreamGetResponseBase> = z.object({
dashboards: z.array(NonEmptyString),
queries: z.array(streamQuerySchema),
});

View file

@ -6,9 +6,11 @@
*/
export * from './ingest';
export * from './base/api';
export * from './api';
export * from './core';
export * from './helpers';
export * from './group';
export * from './record_types';
export * from './content';
export * from './significant_events';

View file

@ -149,6 +149,8 @@ export {
ingestStreamUpsertRequestSchema,
ingestUpsertRequestSchema,
ingestStreamGetResponseSchema,
wiredStreamUpsertRequestSchema,
unwiredStreamUpsertRequestSchema,
wiredStreamGetResponseSchema,
unwiredStreamGetResponseSchema,
type IngestGetResponse,

View file

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StreamQueryKql } from '../base/api';
/**
* SignificantEvents Get Response
*/
type ChangePointsType =
| 'dip'
| 'distribution_change'
| 'non_stationary'
| 'spike'
| 'stationary'
| 'step_change'
| 'trend_change';
type SignificantEventsResponse = StreamQueryKql & {
occurrences: Array<{ date: string; count: number }>;
change_points: {
type: Record<ChangePointsType, { p_value: number; change_point: number }>;
};
};
type SignificantEventsGetResponse = SignificantEventsResponse[];
export type { SignificantEventsResponse, SignificantEventsGetResponse };

View file

@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './api';

View file

@ -6,35 +6,60 @@
*/
import { ValuesType } from 'utility-types';
import { StreamQuery } from '@kbn/streams-schema';
export const ASSET_TYPES = {
Dashboard: 'dashboard' as const,
Rule: 'rule' as const,
Slo: 'slo' as const,
Query: 'query' as const,
};
export type AssetType = ValuesType<typeof ASSET_TYPES>;
export interface AssetLink<TAssetType extends AssetType = AssetType> {
assetType: TAssetType;
assetId: string;
interface AssetLinkBase<TAssetType extends AssetType = AssetType> {
'asset.uuid': string;
'asset.type': TAssetType;
'asset.id': string;
}
export type DashboardLink = AssetLink<'dashboard'>;
export type SloLink = AssetLink<'slo'>;
export type RuleLink = AssetLink<'rule'>;
export type DashboardLink = AssetLinkBase<'dashboard'>;
export type SloLink = AssetLinkBase<'slo'>;
export type RuleLink = AssetLinkBase<'rule'>;
export type QueryLink = AssetLinkBase<'query'> & {
query: StreamQuery;
};
export interface Asset<TAssetType extends AssetType = AssetType> extends AssetLink<TAssetType> {
label: string;
export type AssetLink = DashboardLink | SloLink | RuleLink | QueryLink;
type OmitFrom<T, K> = T extends any ? (K extends keyof T ? Omit<T, K> : never) : never;
export type AssetLinkRequest = OmitFrom<AssetLink, 'asset.uuid'>;
export type AssetUnlinkRequest = Pick<AssetLink, 'asset.type' | 'asset.id'>;
interface AssetBase<TAssetType extends AssetType> extends AssetLinkBase<TAssetType> {
title: string;
}
interface SavedObjectAssetBase<TAssetType extends AssetType = AssetType>
extends AssetBase<TAssetType> {
tags: string[];
}
export type DashboardAsset = Asset<'dashboard'>;
export type SloAsset = Asset<'slo'>;
export type RuleAsset = Asset<'rule'>;
export type DashboardAsset = SavedObjectAssetBase<'dashboard'>;
export type SloAsset = SavedObjectAssetBase<'slo'>;
export type RuleAsset = SavedObjectAssetBase<'rule'>;
export type QueryAsset = AssetBase<'query'> & {
query: StreamQuery;
};
export type Asset = DashboardAsset | SloAsset | RuleAsset | QueryAsset;
export type AssetWithoutUuid = Omit<AssetLink, 'asset.uuid'>;
export interface AssetTypeToAssetMap {
[ASSET_TYPES.Dashboard]: DashboardAsset;
[ASSET_TYPES.Slo]: SloAsset;
[ASSET_TYPES.Rule]: RuleAsset;
[ASSET_TYPES.Query]: QueryAsset;
}

View file

@ -4,33 +4,48 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { SanitizedRule } from '@kbn/alerting-plugin/common';
import { RulesClient } from '@kbn/alerting-plugin/server';
import { SavedObject, SavedObjectsClientContract } from '@kbn/core/server';
import { IStorageClient } from '@kbn/storage-adapter';
import { keyBy } from 'lodash';
import { keyBy, partition } from 'lodash';
import objectHash from 'object-hash';
import pLimit from 'p-limit';
import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import {
ASSET_TYPES,
Asset,
AssetLink,
AssetLinkRequest,
AssetType,
DashboardAsset,
SloAsset,
RuleAsset,
AssetUnlinkRequest,
AssetWithoutUuid,
DashboardLink,
QueryAsset,
QueryLink,
RuleLink,
SloLink,
} from '../../../../common/assets';
import { ASSET_ENTITY_ID, ASSET_ENTITY_TYPE, ASSET_TYPE } from './fields';
import {
ASSET_ID,
ASSET_TYPE,
ASSET_UUID,
QUERY_KQL_BODY,
QUERY_TITLE,
STREAM_NAME,
} from './fields';
import { AssetStorageSettings } from './storage_settings';
import { AssetNotFoundError } from '../errors/asset_not_found_error';
interface TermQueryOpts {
queryEmptyString: boolean;
}
type TermQueryFieldValue = string | boolean | number | null;
function termQuery<T extends string>(
field: T,
value: string | boolean | number | undefined | null,
value: TermQueryFieldValue | undefined,
opts: TermQueryOpts = { queryEmptyString: true }
): QueryDslQueryContainer[] {
if (value === null || value === undefined || (!opts.queryEmptyString && value === '')) {
@ -40,76 +55,126 @@ function termQuery<T extends string>(
return [{ term: { [field]: value } }];
}
function termsQuery<T extends string>(
field: T,
values: Array<TermQueryFieldValue | undefined> | null | undefined
): QueryDslQueryContainer[] {
if (values === null || values === undefined || values.length === 0) {
return [];
}
const filteredValues = values.filter(
(value) => value !== undefined
) as unknown as TermQueryFieldValue[];
return [{ terms: { [field]: filteredValues } }];
}
function getUuid(name: string, asset: Pick<AssetLink, 'asset.id' | 'asset.type'>) {
return objectHash({
[STREAM_NAME]: name,
[ASSET_ID]: asset[ASSET_ID],
[ASSET_TYPE]: asset[ASSET_TYPE],
});
}
function toAssetLink<TAssetLink extends AssetLinkRequest>(
name: string,
asset: TAssetLink
): TAssetLink & { [ASSET_UUID]: string } {
return {
...asset,
[ASSET_UUID]: getUuid(name, asset),
};
}
function sloSavedObjectToAsset(
sloId: string,
savedObject: SavedObject<{ name: string; tags: string[] }>
): SloAsset {
) {
return {
assetId: sloId,
label: savedObject.attributes.name,
[ASSET_ID]: sloId,
[ASSET_TYPE]: 'slo' as const,
title: savedObject.attributes.name,
tags: savedObject.attributes.tags.concat(
savedObject.references.filter((ref) => ref.type === 'tag').map((ref) => ref.id)
),
assetType: 'slo',
};
}
function dashboardSavedObjectToAsset(
dashboardId: string,
savedObject: SavedObject<{ title: string }>
): DashboardAsset {
) {
return {
assetId: dashboardId,
label: savedObject.attributes.title,
[ASSET_ID]: dashboardId,
[ASSET_TYPE]: 'dashboard' as const,
title: savedObject.attributes.title,
tags: savedObject.references.filter((ref) => ref.type === 'tag').map((ref) => ref.id),
assetType: 'dashboard',
};
}
function ruleToAsset(ruleId: string, rule: SanitizedRule): RuleAsset {
function ruleToAsset(ruleId: string, rule: SanitizedRule) {
return {
assetType: 'rule',
assetId: ruleId,
label: rule.name,
[ASSET_TYPE]: 'rule' as const,
[ASSET_ID]: ruleId,
title: rule.name,
tags: rule.tags,
};
}
function getAssetDocument({
assetId,
entityId,
entityType,
assetType,
}: AssetLink & { entityId: string; entityType: string }) {
const doc = {
'asset.id': assetId,
'asset.type': assetType,
'entity.id': entityId,
'entity.type': entityType,
};
type StoredQueryLink = Omit<QueryLink, 'query'> & {
[QUERY_TITLE]: string;
[QUERY_KQL_BODY]: string;
};
return {
_id: objectHash(doc),
...doc,
};
}
export type StoredAssetLink = (SloLink | RuleLink | DashboardLink | StoredQueryLink) & {
[STREAM_NAME]: string;
};
interface AssetBulkIndexOperation {
index: { asset: AssetLink };
index: { asset: AssetLinkRequest };
}
interface AssetBulkDeleteOperation {
delete: { asset: AssetLink };
delete: { asset: AssetUnlinkRequest };
}
function fromStorage(link: StoredAssetLink): AssetLink {
if (link['asset.type'] === 'query') {
return {
...link,
query: {
id: link['asset.id'],
title: link['query.title'],
kql: {
query: link['query.kql.query'],
},
},
} satisfies QueryLink;
}
return link;
}
function toStorage(name: string, request: AssetLinkRequest): StoredAssetLink {
const link = toAssetLink(name, request);
if (link['asset.type'] === 'query') {
const { query, ...rest } = link;
return {
...rest,
[STREAM_NAME]: name,
'query.title': query.title,
'query.kql.query': query.kql.query,
};
}
return {
...link,
[STREAM_NAME]: name,
};
}
export type AssetBulkOperation = AssetBulkIndexOperation | AssetBulkDeleteOperation;
export interface StoredAssetLink {
'asset.id': string;
'asset.type': AssetType;
'entity.id': string;
'entity.type': string;
}
export class AssetClient {
constructor(
private readonly clients: {
@ -119,178 +184,132 @@ export class AssetClient {
}
) {}
async linkAsset(
properties: {
entityId: string;
entityType: string;
} & AssetLink
) {
const { _id: id, ...document } = getAssetDocument(properties);
async linkAsset(name: string, link: AssetLinkRequest) {
const document = toStorage(name, link);
await this.clients.storageClient.index({
id,
id: document[ASSET_UUID],
document,
});
}
async syncAssetList({
entityId,
entityType,
assetType,
assetIds,
}: {
entityId: string;
entityType: string;
assetType: AssetType;
assetIds: string[];
}) {
async syncAssetList(
name: string,
links: AssetLinkRequest[]
): Promise<{ deleted: AssetLink[]; indexed: AssetLink[] }> {
const assetsResponse = await this.clients.storageClient.search({
size: 10_000,
track_total_hits: false,
query: {
bool: {
filter: [
...termQuery(ASSET_ENTITY_ID, entityId),
...termQuery(ASSET_ENTITY_TYPE, entityType),
...termQuery(ASSET_TYPE, assetType),
],
filter: [...termQuery(STREAM_NAME, name)],
},
},
});
const existingAssetLinks = assetsResponse.hits.hits.map((hit) => hit._source);
const existingAssetLinks = assetsResponse.hits.hits.map((hit) => {
return fromStorage(hit._source);
});
const newAssetIds = assetIds.filter(
(assetId) =>
!existingAssetLinks.some((existingAssetLink) => existingAssetLink['asset.id'] === assetId)
);
const newStoredLinks = links.map((link) => {
return toAssetLink(name, link);
});
const assetIdsToRemove = existingAssetLinks
.map((existingAssetLink) => existingAssetLink['asset.id'])
.filter((assetId) => !assetIds.includes(assetId));
const nextIds = newStoredLinks.map((link) => link[ASSET_UUID]);
await Promise.all([
...newAssetIds.map((assetId) =>
this.linkAsset({
entityId,
entityType,
assetId,
assetType,
})
),
...assetIdsToRemove.map((assetId) =>
this.unlinkAsset({
entityId,
entityType,
assetId,
assetType,
})
),
]);
const docsToRemove = existingAssetLinks.filter((link) => !nextIds.includes(link[ASSET_UUID]));
const operations: AssetBulkOperation[] = [
...docsToRemove.map((asset) => ({ delete: { asset } })),
...newStoredLinks.map((asset) => ({ index: { asset } })),
];
if (operations.length) {
await this.bulk(name, operations);
}
return {
deleted: docsToRemove,
indexed: newStoredLinks,
};
}
async unlinkAsset(
properties: {
entityId: string;
entityType: string;
} & AssetLink
) {
const { _id: id } = getAssetDocument(properties);
async unlinkAsset(name: string, asset: AssetUnlinkRequest) {
const id = getUuid(name, asset);
await this.clients.storageClient.delete({ id });
const { result } = await this.clients.storageClient.delete({ id });
if (result === 'not_found') {
throw new AssetNotFoundError(`${asset[ASSET_TYPE]} not found`);
}
}
async clean() {
await this.clients.storageClient.clean();
}
async getAssetIds({
entityId,
entityType,
assetType,
}: {
entityId: string;
entityType: 'stream';
assetType: AssetType;
}): Promise<string[]> {
async getAssetLinks<TAssetType extends AssetType>(
name: string,
types?: TAssetType[]
): Promise<Array<Extract<AssetLink, { [ASSET_TYPE]: TAssetType }>>> {
const assetsResponse = await this.clients.storageClient.search({
size: 10_000,
track_total_hits: false,
query: {
bool: {
filter: [
...termQuery(ASSET_ENTITY_ID, entityId),
...termQuery(ASSET_ENTITY_TYPE, entityType),
...termQuery(ASSET_TYPE, assetType),
...termQuery(STREAM_NAME, name),
...(types?.length ? termsQuery(ASSET_TYPE, types) : []),
],
},
},
});
return assetsResponse.hits.hits.map((hit) => hit._source['asset.id']);
return assetsResponse.hits.hits.map(
(hit) => fromStorage(hit._source) as Extract<AssetLink, { [ASSET_TYPE]: TAssetType }>
);
}
async bulk(
{ entityId, entityType }: { entityId: string; entityType: string },
operations: AssetBulkOperation[]
) {
async bulk(name: string, operations: AssetBulkOperation[]) {
return await this.clients.storageClient.bulk({
operations: operations.map((operation) => {
const { _id, ...document } = getAssetDocument({
...Object.values(operation)[0].asset,
entityId,
entityType,
});
if ('index' in operation) {
const document = toStorage(name, Object.values(operation)[0].asset as AssetLinkRequest);
return {
index: {
document,
_id,
_id: document[ASSET_UUID],
},
};
}
const id = getUuid(name, operation.delete.asset);
return {
delete: {
_id,
_id: id,
},
};
}),
});
}
async getAssets({
entityId,
entityType,
}: {
entityId: string;
entityType: 'stream';
}): Promise<Asset[]> {
const assetsResponse = await this.clients.storageClient.search({
size: 10_000,
track_total_hits: false,
query: {
bool: {
filter: [
...termQuery(ASSET_ENTITY_ID, entityId),
...termQuery(ASSET_ENTITY_TYPE, entityType),
],
},
},
});
const assetLinks = assetsResponse.hits.hits.map((hit) => hit._source);
async getAssets(name: string): Promise<Asset[]> {
const assetLinks = await this.getAssetLinks(name);
if (!assetLinks.length) {
return [];
}
const [queryAssetLinks, savedObjectAssetLinks] = partition(
assetLinks,
(link): link is QueryLink => link[ASSET_TYPE] === 'query'
);
const idsByType = Object.fromEntries(
Object.values(ASSET_TYPES).map((type) => [type, [] as string[]])
) as Record<AssetType, string[]>;
assetLinks.forEach((assetLink) => {
savedObjectAssetLinks.forEach((assetLink) => {
const assetType = assetLink['asset.type'] as AssetType;
const assetId = assetLink['asset.id'];
idsByType[assetType].push(assetId);
@ -307,7 +326,7 @@ export class AssetClient {
.then((response) => {
const dashboardsById = keyBy(response.saved_objects, 'id');
return idsByType.dashboard.flatMap((dashboardId): Asset[] => {
return idsByType.dashboard.flatMap((dashboardId) => {
const dashboard = dashboardsById[dashboardId];
if (dashboard && !dashboard.error) {
return [dashboardSavedObjectToAsset(dashboardId, dashboard)];
@ -319,7 +338,7 @@ export class AssetClient {
Promise.all(
idsByType.rule.map((ruleId) => {
return limiter(() =>
this.clients.rulesClient.get({ id: ruleId }).then((rule): Asset => {
this.clients.rulesClient.get({ id: ruleId }).then((rule) => {
return ruleToAsset(ruleId, rule);
})
);
@ -337,7 +356,7 @@ export class AssetClient {
.then((soResponse) => {
const sloDefinitionsById = keyBy(soResponse.saved_objects, 'slo.attributes.id');
return idsByType.slo.flatMap((sloId): Asset[] => {
return idsByType.slo.flatMap((sloId) => {
const sloDefinition = sloDefinitionsById[sloId];
if (sloDefinition && !sloDefinition.error) {
return [sloSavedObjectToAsset(sloId, sloDefinition)];
@ -348,7 +367,25 @@ export class AssetClient {
: [],
]);
return [...dashboards, ...rules, ...slos];
const savedObjectAssetsWithUuids = [...dashboards, ...rules, ...slos].map((asset) => {
return {
...asset,
[ASSET_UUID]: getUuid(name, asset),
};
});
return [
...savedObjectAssetsWithUuids,
...queryAssetLinks.map((link): QueryAsset => {
return {
[ASSET_ID]: link[ASSET_ID],
[ASSET_UUID]: link[ASSET_UUID],
[ASSET_TYPE]: link[ASSET_TYPE],
query: link.query,
title: link.query.title,
};
}),
];
}
async getSuggestions({
@ -359,7 +396,7 @@ export class AssetClient {
query: string;
assetTypes?: AssetType[];
tags?: string[];
}): Promise<{ hasMore: boolean; assets: Asset[] }> {
}): Promise<{ hasMore: boolean; assets: AssetWithoutUuid[] }> {
const perPage = 101;
const searchAll = !assetTypes;

View file

@ -5,7 +5,10 @@
* 2.0.
*/
export const ASSET_ENTITY_ID = 'entity.id';
export const ASSET_ENTITY_TYPE = 'entity.type';
export const ASSET_ASSET_ID = 'asset.id';
export const STREAM_NAME = 'stream.name';
export const ASSET_UUID = 'asset.uuid';
export const ASSET_ID = 'asset.id';
export const ASSET_TYPE = 'asset.type';
export const QUERY_KQL_BODY = 'query.kql.query';
export const QUERY_TITLE = 'query.title';

View file

@ -6,16 +6,25 @@
*/
import { IndexStorageSettings, types } from '@kbn/storage-adapter';
import { ASSET_ASSET_ID, ASSET_ENTITY_ID, ASSET_ENTITY_TYPE, ASSET_TYPE } from './fields';
import {
ASSET_ID,
ASSET_TYPE,
ASSET_UUID,
QUERY_KQL_BODY,
QUERY_TITLE,
STREAM_NAME,
} from './fields';
export const assetStorageSettings = {
name: '.kibana_streams_assets',
schema: {
properties: {
[ASSET_ASSET_ID]: types.keyword(),
[ASSET_UUID]: types.keyword(),
[ASSET_ID]: types.keyword(),
[ASSET_TYPE]: types.keyword(),
[ASSET_ENTITY_ID]: types.keyword(),
[ASSET_ENTITY_TYPE]: types.keyword(),
[STREAM_NAME]: types.keyword(),
[QUERY_KQL_BODY]: types.match_only_text(),
[QUERY_TITLE]: types.keyword(),
},
},
} satisfies IndexStorageSettings;

View file

@ -11,7 +11,7 @@ import {
QueryDslQueryContainer,
Result,
} from '@elastic/elasticsearch/lib/api/types';
import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { type IScopedClusterClient, type KibanaRequest, type Logger } from '@kbn/core/server';
import { isResponseError } from '@kbn/es-errors';
import {
Condition,
@ -71,6 +71,7 @@ import { MalformedStreamIdError } from './errors/malformed_stream_id_error';
import { SecurityError } from './errors/security_error';
import { NameTakenError } from './errors/name_taken_error';
import { MalformedStreamError } from './errors/malformed_stream_error';
import { ASSET_ID, ASSET_TYPE } from './assets/fields';
interface AcknowledgeResponse<TResult extends Result> {
acknowledged: true;
@ -93,6 +94,20 @@ function isDefinitionNotFoundError(error: unknown): error is DefinitionNotFoundE
return error instanceof DefinitionNotFoundError;
}
/*
* When calling into Elasticsearch, the stack trace is lost.
* If we create an error before calling, and append it to
* any stack of the caught error, we get a more useful stack
* trace.
*/
function wrapEsCall<T>(p: Promise<T>): Promise<T> {
const error = new Error();
return p.catch((caughtError) => {
caughtError.stack += error.stack;
throw caughtError;
});
}
export class StreamsClient {
constructor(
private readonly dependencies: {
@ -101,6 +116,7 @@ export class StreamsClient {
storageClient: StreamsStorageClient;
logger: Logger;
isServerless: boolean;
request: KibanaRequest;
}
) {}
@ -136,6 +152,7 @@ export class StreamsClient {
await this.upsertStream({
request: {
dashboards: [],
queries: [],
stream: omit(rootStreamDefinition, 'name'),
},
name: rootStreamDefinition.name,
@ -150,10 +167,11 @@ export class StreamsClient {
* such as data streams. That means it deletes all data
* belonging to wired streams.
*
* It does NOT delete ingest streams.
* It does NOT delete unwired streams.
*/
async disableStreams(): Promise<DisableStreamsResponse> {
const isEnabled = await this.isStreamsEnabled();
if (!isEnabled) {
return { acknowledged: true, result: 'noop' };
}
@ -243,7 +261,7 @@ export class StreamsClient {
request: StreamUpsertRequest;
}): Promise<UpsertStreamResponse> {
const stream: StreamDefinition = { ...request.stream, name };
const { dashboards } = request;
const { dashboards, queries } = request;
const { result, parentDefinition } = await this.validateAndUpsertStream({
definition: stream,
});
@ -275,6 +293,7 @@ export class StreamsClient {
name: parentId,
request: {
dashboards: [],
queries: [],
stream: {
ingest: {
lifecycle: { inherit: {} },
@ -295,12 +314,19 @@ export class StreamsClient {
}
}
await this.dependencies.assetClient.syncAssetList({
entityId: stream.name,
entityType: 'stream',
assetIds: dashboards,
assetType: 'dashboard',
});
const queryLinks = queries.map((query) => ({
[ASSET_ID]: query.id,
[ASSET_TYPE]: 'query' as const,
query,
}));
await this.dependencies.assetClient.syncAssetList(stream.name, [
...dashboards.map((dashboard) => ({
[ASSET_ID]: dashboard,
[ASSET_TYPE]: 'dashboard' as const,
})),
...queryLinks,
]);
return { acknowledged: true, result };
}
@ -689,28 +715,28 @@ export class StreamsClient {
}
async getDataStream(name: string): Promise<IndicesDataStream> {
return this.dependencies.scopedClusterClient.asCurrentUser.indices
.getDataStream({ name })
.then((response) => {
if (response.data_streams.length === 0) {
throw new errors.ResponseError({
meta: {
aborted: false,
attempts: 1,
connection: null,
context: null,
name: 'resource_not_found_exception',
request: {} as unknown as DiagnosticResult['meta']['request'],
},
warnings: [],
body: 'resource_not_found_exception',
statusCode: 404,
});
}
return wrapEsCall(
this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream({ name })
).then((response) => {
if (response.data_streams.length === 0) {
throw new errors.ResponseError({
meta: {
aborted: false,
attempts: 1,
connection: null,
context: null,
name: 'resource_not_found_exception',
request: {} as unknown as DiagnosticResult['meta']['request'],
},
warnings: [],
body: 'resource_not_found_exception',
statusCode: 404,
});
}
const dataStream = response.data_streams[0];
return dataStream;
});
const dataStream = response.data_streams[0];
return dataStream;
});
}
/**
@ -774,8 +800,9 @@ export class StreamsClient {
* stored definition).
*/
private async getUnmanagedDataStreams(): Promise<UnwiredStreamDefinition[]> {
const response =
await this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream();
const response = await wrapEsCall(
this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream()
);
return response.data_streams.map((dataStream) => ({
name: dataStream.name,
@ -859,12 +886,7 @@ export class StreamsClient {
await deleteStreamObjects({ scopedClusterClient, name: definition.name, logger });
}
await assetClient.syncAssetList({
entityId: definition.name,
entityType: 'stream',
assetType: 'dashboard',
assetIds: [],
});
await assetClient.syncAssetList(definition.name, []);
await this.dependencies.storageClient.delete({ id: definition.name });
}

View file

@ -5,8 +5,8 @@
* 2.0.
*/
import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { IngestStreamLifecycle, isDslLifecycle, isIlmLifecycle } from '@kbn/streams-schema';
import { retryTransientEsErrors } from '../helpers/retry';

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StatusError } from './status_error';
export class AssetNotFoundError extends StatusError {
constructor(message: string) {
super(message, 404);
this.name = 'AssetNotFoundError';
}
}

View file

@ -47,11 +47,10 @@ export class StreamsService {
const isServerless = coreStart.elasticsearch.getCapabilities().serverless;
const storageAdapter = new StorageIndexAdapter<StreamsStorageSettings, StreamDefinition>(
scopedClusterClient.asInternalUser,
logger,
streamsStorageSettings
);
const storageAdapter = new StorageIndexAdapter<
StreamsStorageSettings,
StreamDefinition & { _id: string }
>(scopedClusterClient.asInternalUser, logger, streamsStorageSettings);
return new StreamsClient({
assetClient,
@ -59,6 +58,7 @@ export class StreamsService {
scopedClusterClient,
storageClient: storageAdapter.getClient(),
isServerless,
request,
});
}
}

View file

@ -5,18 +5,21 @@
* 2.0.
*/
import { Readable } from 'stream';
import { z } from '@kbn/zod';
import { createSavedObjectsStreamFromNdJson } from '@kbn/core-saved-objects-server-internal/src/routes/utils';
import { ContentPack, contentPackSchema } from '@kbn/streams-schema';
import {
createConcatStream,
createListStream,
createMapStream,
createPromiseFromStreams,
} from '@kbn/utils';
import { createSavedObjectsStreamFromNdJson } from '@kbn/core-saved-objects-server-internal/src/routes/utils';
import { ContentPack, contentPackSchema } from '@kbn/streams-schema';
import { createServerRoute } from '../create_server_route';
import { z } from '@kbn/zod';
import { Readable } from 'stream';
import { Asset } from '../../../common';
import { DashboardAsset, DashboardLink } from '../../../common/assets';
import { ASSET_ID, ASSET_TYPE } from '../../lib/streams/assets/fields';
import { StatusError } from '../../lib/streams/errors/status_error';
import { createServerRoute } from '../create_server_route';
const exportContentRoute = createServerRoute({
endpoint: 'POST /api/streams/{name}/content/export 2023-10-31',
@ -42,9 +45,11 @@ const exportContentRoute = createServerRoute({
await streamsClient.ensureStream(params.path.name);
const dashboards = await assetClient
.getAssets({ entityId: params.path.name, entityType: 'stream' })
.then((assets) => assets.filter(({ assetType }) => assetType === 'dashboard'));
function isDashboard(asset: Asset): asset is DashboardAsset {
return asset[ASSET_TYPE] === 'dashboard';
}
const dashboards = (await assetClient.getAssets(params.path.name)).filter(isDashboard);
if (dashboards.length === 0) {
throw new StatusError(`No dashboards are linked to [${params.path.name}] stream`, 400);
}
@ -52,7 +57,7 @@ const exportContentRoute = createServerRoute({
const exporter = (await context.core).savedObjects.getExporter(soClient);
const exportStream = await exporter.exportByObjects({
request,
objects: dashboards.map((dashboard) => ({ id: dashboard.assetId, type: 'dashboard' })),
objects: dashboards.map((dashboard) => ({ id: dashboard[ASSET_ID], type: 'dashboard' })),
includeReferencesDeep: true,
});
@ -131,19 +136,18 @@ const importContentRoute = createServerRoute({
overwrite: true,
});
const createdAssets = (successResults ?? [])
.filter((savedObject) => savedObject.type === 'dashboard')
.map((dashboard) => ({
assetType: 'dashboard' as const,
assetId: dashboard.destinationId ?? dashboard.id,
}));
const createdAssets: Array<Omit<DashboardLink, 'asset.uuid'>> =
successResults
?.filter((savedObject) => savedObject.type === 'dashboard')
.map((dashboard) => ({
[ASSET_TYPE]: 'dashboard',
[ASSET_ID]: dashboard.destinationId ?? dashboard.id,
})) ?? [];
if (createdAssets.length > 0) {
await assetClient.bulk(
{ entityId: params.path.name, entityType: 'stream' },
createdAssets.map((asset) => ({
index: { asset },
}))
params.path.name,
createdAssets.map((asset) => ({ index: { asset } }))
);
}

View file

@ -10,10 +10,11 @@ import { ErrorCause } from '@elastic/elasticsearch/lib/api/types';
import { internal } from '@hapi/boom';
import { Asset, DashboardAsset } from '../../../common/assets';
import { createServerRoute } from '../create_server_route';
import { ASSET_ID, ASSET_TYPE } from '../../lib/streams/assets/fields';
export interface SanitizedDashboardAsset {
id: string;
label: string;
title: string;
tags: string[];
}
@ -41,8 +42,8 @@ export type BulkUpdateAssetsResponse =
function sanitizeDashboardAsset(asset: DashboardAsset): SanitizedDashboardAsset {
return {
id: asset.assetId,
label: asset.label,
id: asset[ASSET_ID],
title: asset.title,
tags: asset.tags,
};
}
@ -79,16 +80,11 @@ const listDashboardsRoute = createServerRoute({
} = params;
function isDashboard(asset: Asset): asset is DashboardAsset {
return asset.assetType === 'dashboard';
return asset[ASSET_TYPE] === 'dashboard';
}
return {
dashboards: (
await assetClient.getAssets({
entityId: streamName,
entityType: 'stream',
})
)
dashboards: (await assetClient.getAssets(streamName))
.filter(isDashboard)
.map(sanitizeDashboardAsset),
};
@ -127,11 +123,9 @@ const linkDashboardRoute = createServerRoute({
await streamsClient.ensureStream(streamName);
await assetClient.linkAsset({
entityId: streamName,
entityType: 'stream',
assetId: dashboardId,
assetType: 'dashboard',
await assetClient.linkAsset(streamName, {
[ASSET_TYPE]: 'dashboard',
[ASSET_ID]: dashboardId,
});
return {
@ -173,11 +167,9 @@ const unlinkDashboardRoute = createServerRoute({
path: { dashboardId, name: streamName },
} = params;
await assetClient.unlinkAsset({
entityId: streamName,
entityType: 'stream',
assetId: dashboardId,
assetType: 'dashboard',
await assetClient.unlinkAsset(streamName, {
[ASSET_ID]: dashboardId,
[ASSET_TYPE]: 'dashboard',
});
return {
@ -290,17 +282,14 @@ const bulkDashboardsRoute = createServerRoute({
await streamsClient.ensureStream(streamName);
const result = await assetClient.bulk(
{
entityId: streamName,
entityType: 'stream',
},
streamName,
operations.map((operation) => {
if ('index' in operation) {
return {
index: {
asset: {
assetType: 'dashboard',
assetId: operation.index.id,
[ASSET_TYPE]: 'dashboard',
[ASSET_ID]: operation.index.id,
},
},
};
@ -308,8 +297,8 @@ const bulkDashboardsRoute = createServerRoute({
return {
delete: {
asset: {
assetType: 'dashboard',
assetId: operation.delete.id,
[ASSET_TYPE]: 'dashboard',
[ASSET_ID]: operation.delete.id,
},
},
};

View file

@ -19,6 +19,8 @@ import { contentRoutes } from './content/route';
import { internalDashboardRoutes } from './internal/dashboards/route';
import { internalCrudRoutes } from './internal/streams/crud/route';
import { internalManagementRoutes } from './internal/streams/management/route';
import { significantEventsRoutes } from './streams/significant_events/route';
import { queryRoutes } from './queries/route';
export const streamsRouteRepository = {
// internal APIs
@ -37,6 +39,8 @@ export const streamsRouteRepository = {
...ingestRoutes,
...groupRoutes,
...contentRoutes,
...significantEventsRoutes,
...queryRoutes,
};
export type StreamsRouteRepository = typeof streamsRouteRepository;

View file

@ -9,6 +9,7 @@ import { z } from '@kbn/zod';
import { DashboardAsset } from '../../../../common/assets';
import { createServerRoute } from '../../create_server_route';
import { SanitizedDashboardAsset } from '../../dashboards/route';
import { ASSET_ID } from '../../../lib/streams/assets/fields';
export interface SuggestDashboardResponse {
suggestions: SanitizedDashboardAsset[];
@ -16,8 +17,8 @@ export interface SuggestDashboardResponse {
function sanitizeDashboardAsset(asset: DashboardAsset): SanitizedDashboardAsset {
return {
id: asset.assetId,
label: asset.label,
id: asset[ASSET_ID],
title: asset.title,
tags: asset.tags,
};
}

View file

@ -0,0 +1,258 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ErrorCause } from '@elastic/elasticsearch/lib/api/types';
import { internal } from '@hapi/boom';
import {
StreamQuery,
streamQuerySchema,
upsertStreamQueryRequestSchema,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { ASSET_ID, ASSET_TYPE } from '../../lib/streams/assets/fields';
import { createServerRoute } from '../create_server_route';
export interface ListQueriesResponse {
queries: StreamQuery[];
}
export interface UpsertQueryResponse {
acknowledged: boolean;
}
export interface DeleteQueryResponse {
acknowledged: boolean;
}
export type BulkUpdateAssetsResponse = { acknowledged: boolean } | { errors: ErrorCause[] };
const listQueriesRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/queries 2023-10-31',
options: {
access: 'public',
summary: 'Get stream queries',
description:
'Fetches all queries linked to a stream that are visible to the current user in the current space.',
availability: {
stability: 'experimental',
},
},
params: z.object({
path: z.object({
name: z.string(),
}),
}),
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
async handler({ params, request, getScopedClients }): Promise<ListQueriesResponse> {
const { assetClient, streamsClient } = await getScopedClients({ request });
await streamsClient.ensureStream(params.path.name);
const {
path: { name: streamName },
} = params;
const queryAssets = await assetClient.getAssetLinks(streamName, ['query']);
return {
queries: queryAssets.map((queryAsset) => queryAsset.query),
};
},
});
const upsertQueryRoute = createServerRoute({
endpoint: 'PUT /api/streams/{name}/queries/{queryId} 2023-10-31',
options: {
access: 'public',
summary: 'Upsert a query to a stream',
description: 'Adds a query to a stream. Noop if the query is already present on the stream.',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
name: z.string(),
queryId: z.string(),
}),
body: upsertStreamQueryRequestSchema,
}),
handler: async ({ params, request, getScopedClients }): Promise<UpsertQueryResponse> => {
const { assetClient, streamsClient } = await getScopedClients({ request });
const {
path: { name: streamName, queryId },
body,
} = params;
await streamsClient.ensureStream(streamName);
await assetClient.linkAsset(streamName, {
[ASSET_TYPE]: 'query',
[ASSET_ID]: queryId,
query: {
id: queryId,
title: body.title,
kql: {
query: body.kql.query,
},
},
});
return {
acknowledged: true,
};
},
});
const deleteQueryRoute = createServerRoute({
endpoint: 'DELETE /api/streams/{name}/queries/{queryId} 2023-10-31',
options: {
access: 'public',
summary: 'Remove a query from a stream',
description: 'Remove a query from a stream. Noop if the query is not found on the stream.',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
name: z.string(),
queryId: z.string(),
}),
}),
handler: async ({ params, request, getScopedClients }): Promise<DeleteQueryResponse> => {
const { assetClient, streamsClient } = await getScopedClients({ request });
const {
path: { queryId, name: streamName },
} = params;
await streamsClient.ensureStream(streamName);
await assetClient.unlinkAsset(streamName, {
[ASSET_TYPE]: 'query',
[ASSET_ID]: queryId,
});
return {
acknowledged: true,
};
},
});
const bulkQueriesRoute = createServerRoute({
endpoint: `POST /api/streams/{name}/queries/_bulk 2023-10-31`,
options: {
access: 'public',
summary: 'Bulk update queries',
description: 'Bulk update queries of a stream. Can add new queries and delete existing ones.',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
name: z.string(),
}),
body: z.object({
operations: z.array(
z.union([
z.object({
index: streamQuerySchema,
}),
z.object({
delete: z.object({ id: z.string() }),
}),
])
),
}),
}),
handler: async ({
params,
request,
getScopedClients,
logger,
}): Promise<BulkUpdateAssetsResponse> => {
const { assetClient, streamsClient } = await getScopedClients({ request });
const {
path: { name: streamName },
body: { operations },
} = params;
await streamsClient.ensureStream(streamName);
const result = await assetClient.bulk(
streamName,
operations.map((operation) => {
if ('index' in operation) {
return {
index: {
asset: {
[ASSET_TYPE]: 'query',
[ASSET_ID]: operation.index.id,
query: {
id: operation.index.id,
title: operation.index.title,
kql: { query: operation.index.kql.query },
},
},
},
};
}
return {
delete: {
asset: {
[ASSET_TYPE]: 'query',
[ASSET_ID]: operation.delete.id,
},
},
};
})
);
if (result.errors) {
logger.error(`Error indexing some items`);
throw internal(`Could not index all items`, { errors: result.errors });
}
return { acknowledged: true };
},
});
export const queryRoutes = {
...listQueriesRoute,
...upsertQueryRoute,
...deleteQueryRoute,
...bulkQueriesRoute,
};

View file

@ -14,12 +14,15 @@ import {
isUnwiredStreamDefinition,
} from '@kbn/streams-schema';
import { IScopedClusterClient } from '@kbn/core/server';
import { partition } from 'lodash';
import { AssetClient } from '../../../lib/streams/assets/asset_client';
import { StreamsClient } from '../../../lib/streams/client';
import {
getDataStreamLifecycle,
getUnmanagedElasticsearchAssets,
} from '../../../lib/streams/stream_crud';
import { DashboardLink } from '../../../../common/assets';
import { ASSET_TYPE } from '../../../lib/streams/assets/fields';
export async function readStream({
name,
@ -32,19 +35,26 @@ export async function readStream({
streamsClient: StreamsClient;
scopedClusterClient: IScopedClusterClient;
}): Promise<StreamGetResponse> {
const [streamDefinition, dashboards] = await Promise.all([
const [streamDefinition, dashboardsAndQueries] = await Promise.all([
streamsClient.getStream(name),
assetClient.getAssetIds({
entityId: name,
entityType: 'stream',
assetType: 'dashboard',
}),
await assetClient.getAssetLinks(name, ['dashboard', 'query']),
]);
const [dashboardLinks, queryLinks] = partition(
dashboardsAndQueries,
(asset): asset is DashboardLink => asset[ASSET_TYPE] === 'dashboard'
);
const dashboards = dashboardLinks.map((dashboard) => dashboard['asset.id']);
const queries = queryLinks.map((query) => {
return query.query;
});
if (isGroupStreamDefinition(streamDefinition)) {
return {
stream: streamDefinition,
dashboards,
queries,
};
}
@ -71,6 +81,7 @@ export async function readStream({
data_stream_exists: !!dataStream,
effective_lifecycle: getDataStreamLifecycle(dataStream),
dashboards,
queries,
inherited_fields: {},
};
}
@ -78,6 +89,7 @@ export async function readStream({
const body: WiredStreamGetResponse = {
stream: streamDefinition,
dashboards,
queries,
effective_lifecycle: findInheritedLifecycle(streamDefinition, ancestors),
inherited_fields: getInheritedFieldsFromAncestors(ancestors),
};

View file

@ -14,6 +14,8 @@ import {
isGroupStreamDefinition,
} from '@kbn/streams-schema';
import { createServerRoute } from '../../create_server_route';
import { ASSET_TYPE, ASSET_UUID } from '../../../lib/streams/assets/fields';
import { QueryAsset } from '../../../../common/assets';
const readGroupRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/_group 2023-10-31',
@ -90,20 +92,22 @@ const upsertGroupRoute = createServerRoute({
throw badRequest('A group stream name can not start with [logs.]');
}
const assets = await assetClient.getAssets({
entityId: name,
entityType: 'stream',
});
const assets = await assetClient.getAssets(name);
const groupUpsertRequest = params.body;
const dashboards = assets
.filter((asset) => asset.assetType === 'dashboard')
.map((asset) => asset.assetId);
.filter((asset) => asset[ASSET_TYPE] === 'dashboard')
.map((asset) => asset[ASSET_UUID]);
const queries = assets
.filter((asset): asset is QueryAsset => asset[ASSET_TYPE] === 'query')
.map((asset) => asset.query);
const upsertRequest = {
dashboards,
stream: groupUpsertRequest,
queries,
} as GroupStreamUpsertRequest;
return await streamsClient.upsertStream({

View file

@ -15,6 +15,8 @@ import {
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { createServerRoute } from '../../create_server_route';
import { ASSET_ID, ASSET_TYPE } from '../../../lib/streams/assets/fields';
import { QueryAsset } from '../../../../common/assets';
const readIngestRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/_ingest 2023-10-31',
@ -94,20 +96,22 @@ const upsertIngestRoute = createServerRoute({
const name = params.path.name;
const assets = await assetClient.getAssets({
entityId: name,
entityType: 'stream',
});
const assets = await assetClient.getAssets(name);
const ingestUpsertRequest = params.body;
const dashboards = assets
.filter((asset) => asset.assetType === 'dashboard')
.map((asset) => asset.assetId);
.filter((asset) => asset[ASSET_TYPE] === 'dashboard')
.map((asset) => asset[ASSET_ID]);
const queries = assets
.filter((asset): asset is QueryAsset => asset[ASSET_TYPE] === 'query')
.map((asset) => asset.query);
const upsertRequest = {
dashboards,
stream: ingestUpsertRequest,
queries,
} as StreamUpsertRequest;
return await streamsClient.upsertStream({

View file

@ -0,0 +1,131 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { AggregationsDateHistogramAggregate } from '@elastic/elasticsearch/lib/api/types';
import { IScopedClusterClient } from '@kbn/core/server';
import { buildEsQuery } from '@kbn/es-query';
import { ChangePointType } from '@kbn/es-types/src';
import { SignificantEventsGetResponse } from '@kbn/streams-schema';
import { get, isArray, isEmpty } from 'lodash';
import { AssetClient } from '../../../lib/streams/assets/asset_client';
export async function readSignificantEvents(
params: { name: string; from: Date; to: Date; bucketSize: string },
dependencies: {
assetClient: AssetClient;
scopedClusterClient: IScopedClusterClient;
}
): Promise<SignificantEventsGetResponse> {
const { assetClient, scopedClusterClient } = dependencies;
const { name, from, to, bucketSize } = params;
const assetQueries = await assetClient.getAssetLinks(name, ['query']);
if (isEmpty(assetQueries)) {
return [];
}
const searchRequests = assetQueries.flatMap((asset) => {
return [
{ index: name },
{
size: 0,
query: {
bool: {
filter: [
{
range: {
'@timestamp': {
gte: from.toISOString(),
lte: to.toISOString(),
},
},
},
buildQuery(asset.query.kql.query),
],
},
},
aggs: {
occurrences: {
date_histogram: {
field: '@timestamp',
fixed_interval: bucketSize,
extended_bounds: {
min: from.toISOString(),
max: to.toISOString(),
},
},
},
change_points: {
change_point: {
buckets_path: 'occurrences>_count',
},
},
},
},
];
});
const response = await scopedClusterClient.asCurrentUser.msearch<
unknown,
{
occurrences: AggregationsDateHistogramAggregate;
change_points: {
type: {
[key in ChangePointType]: { p_value: number; change_point: number };
};
};
}
>({ searches: searchRequests });
const significantEvents = response.responses.map((queryResponse, queryIndex) => {
const query = assetQueries[queryIndex];
if ('error' in queryResponse) {
return {
id: query.query.id,
title: query.query.title,
kql: query.query.kql,
occurrences: [],
change_points: {},
};
}
const buckets = get(queryResponse, 'aggregations.occurrences.buckets');
const changePoints = get(queryResponse, 'aggregations.change_points') ?? {};
return {
id: query.query.id,
title: query.query.title,
kql: query.query.kql,
occurrences: isArray(buckets)
? buckets.map((bucket) => ({
date: bucket.key_as_string,
count: bucket.doc_count,
}))
: [],
change_points: changePoints,
};
});
// changePoints type is not inferred correclty
return significantEvents as SignificantEventsGetResponse;
}
function buildQuery(kql: string) {
try {
return buildEsQuery(
undefined,
{
query: kql,
language: 'kuery',
},
[],
{ allowLeadingWildcards: true }
);
} catch (err) {
return { match_all: {} };
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { badRequest } from '@hapi/boom';
import { SignificantEventsGetResponse } from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { createServerRoute } from '../../create_server_route';
import { readSignificantEvents } from './read_significant_events';
export const readSignificantEventsRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/significant_events 2023-10-31',
params: z.object({
path: z.object({ name: z.string() }),
query: z.object({ from: z.coerce.date(), to: z.coerce.date(), bucketSize: z.string() }),
}),
options: {
access: 'public',
summary: 'Read the significant events',
description: 'Read the significant events',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
handler: async ({ params, request, getScopedClients }): Promise<SignificantEventsGetResponse> => {
const { streamsClient, assetClient, scopedClusterClient } = await getScopedClients({
request,
});
const isStreamEnabled = await streamsClient.isStreamsEnabled();
if (!isStreamEnabled) {
throw badRequest('Streams are not enabled');
}
const { name } = params.path;
const { from, to, bucketSize } = params.query;
return await readSignificantEvents(
{ name, from, to, bucketSize },
{ assetClient, scopedClusterClient }
);
},
});
export const significantEventsRoutes = {
...readSignificantEventsRoute,
};

View file

@ -42,6 +42,7 @@
"@kbn/core-elasticsearch-client-server-internal",
"@kbn/utils",
"@kbn/core-saved-objects-server-internal",
"@kbn/core-analytics-server"
"@kbn/core-analytics-server",
"@kbn/es-types",
]
}

View file

@ -5,19 +5,21 @@
* 2.0.
*/
import { ChartsPluginStart } from '@kbn/charts-plugin/public';
import { coreMock } from '@kbn/core/public/mocks';
import type { DataPublicPluginStart } from '@kbn/data-plugin/public';
import type { DataViewsPublicPluginStart } from '@kbn/data-views-plugin/public';
import type { StreamsPluginStart } from '@kbn/streams-plugin/public';
import type { UnifiedSearchPublicPluginStart } from '@kbn/unified-search-plugin/public';
import type { SharePublicStart } from '@kbn/share-plugin/public/plugin';
import { NavigationPublicStart } from '@kbn/navigation-plugin/public/types';
import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-plugin/public';
import { fieldsMetadataPluginPublicMock } from '@kbn/fields-metadata-plugin/public/mocks';
import { DataStreamsStatsClient } from '@kbn/dataset-quality-plugin/public/services/data_streams_stats/data_streams_stats_client';
import { LicensingPluginStart } from '@kbn/licensing-plugin/public';
import type { DiscoverStart } from '@kbn/discover-plugin/public';
import { fieldsMetadataPluginPublicMock } from '@kbn/fields-metadata-plugin/public/mocks';
import { IndexManagementPluginStart } from '@kbn/index-management-shared-types';
import { IngestPipelinesPluginStart } from '@kbn/ingest-pipelines-plugin/public';
import { LicensingPluginStart } from '@kbn/licensing-plugin/public';
import { NavigationPublicStart } from '@kbn/navigation-plugin/public/types';
import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-plugin/public';
import type { SharePublicStart } from '@kbn/share-plugin/public/plugin';
import type { StreamsPluginStart } from '@kbn/streams-plugin/public';
import type { UnifiedSearchPublicPluginStart } from '@kbn/unified-search-plugin/public';
import { DiscoverSharedPublicStart } from '@kbn/discover-shared-plugin/public';
import { ObservabilityAIAssistantPublicStart } from '@kbn/observability-ai-assistant-plugin/public';
import type { StreamsAppKibanaContext } from '../public/hooks/use_kibana';
@ -48,6 +50,8 @@ export function getMockStreamsAppContext(): StreamsAppKibanaContext {
indexManagement: {} as unknown as IndexManagementPluginStart,
ingestPipelines: {} as unknown as IngestPipelinesPluginStart,
discoverShared: {} as unknown as DiscoverSharedPublicStart,
charts: {} as unknown as ChartsPluginStart,
discover: {} as unknown as DiscoverStart,
observabilityAIAssistant: {} as unknown as ObservabilityAIAssistantPublicStart,
},
},

View file

@ -10,9 +10,11 @@
"browser": true,
"configPath": ["xpack", "streamsApp"],
"requiredPlugins": [
"charts",
"data",
"datasetQuality",
"dataViews",
"discover",
"discoverShared",
"fieldsMetadata",
"licensing",
@ -23,10 +25,8 @@
"savedObjectsTagging",
"share",
"streams",
"unifiedSearch",
],
"requiredBundles": [
"kibanaReact"
"unifiedSearch"
],
"requiredBundles": ["kibanaReact"]
}
}

View file

@ -54,7 +54,7 @@ export function DashboardsTable({
name: i18n.translate('xpack.streams.dashboardTable.dashboardNameColumnTitle', {
defaultMessage: 'Dashboard name',
}),
render: (_, { label, id }) => (
render: (_, { title, id }) => (
<EuiLink
data-test-subj="streamsAppColumnsLink"
onClick={() => {
@ -71,7 +71,7 @@ export function DashboardsTable({
}
}}
>
{label}
{title}
</EuiLink>
),
},

View file

@ -33,7 +33,7 @@ export function StreamDetailDashboardsView({
const filteredDashboards = useMemo(() => {
return linkedDashboards.filter((dashboard) => {
return dashboard.label.toLowerCase().includes(query.toLowerCase());
return dashboard.title.toLowerCase().includes(query.toLowerCase());
});
}, [linkedDashboards, query]);

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IngestStreamGetResponse } from '@kbn/streams-schema';
import React from 'react';
export function StreamDetailSignificantEventsView({
definition,
}: {
definition?: IngestStreamGetResponse;
}) {
return <></>;
}

View file

@ -5,16 +5,16 @@
* 2.0.
*/
import { i18n } from '@kbn/i18n';
import React from 'react';
import { Outlet } from '@kbn/typed-react-router-config';
import React from 'react';
import { useKibana } from '../../hooks/use_kibana';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { EntityDetailViewWithoutParams, EntityViewTab } from '../entity_detail_view';
import { StreamDetailDashboardsView } from '../stream_detail_dashboards_view';
import { StreamDetailManagement } from '../data_management/stream_detail_management';
import { StreamDetailOverview } from '../stream_detail_overview';
import { StreamDetailContextProvider, useStreamDetail } from '../../hooks/use_stream_detail';
import { useStreamsAppParams } from '../../hooks/use_streams_app_params';
import { StreamDetailManagement } from '../data_management/stream_detail_management';
import { EntityDetailViewWithoutParams, EntityViewTab } from '../entity_detail_view';
import { RedirectTo } from '../redirect_to';
import { StreamDetailDashboardsView } from '../stream_detail_dashboards_view';
import { StreamDetailOverview } from '../stream_detail_overview';
export function StreamDetailView() {
const { streamsRepositoryClient } = useKibana().dependencies.start.streams;

View file

@ -4,31 +4,33 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ChartsPluginStart } from '@kbn/charts-plugin/public';
import { AppMountParameters } from '@kbn/core/public';
import type { DataPublicPluginSetup, DataPublicPluginStart } from '@kbn/data-plugin/public';
import type {
DataViewsPublicPluginSetup,
DataViewsPublicPluginStart,
} from '@kbn/data-views-plugin/public';
import type { StreamsPluginSetup, StreamsPluginStart } from '@kbn/streams-plugin/public';
import type { UnifiedSearchPublicPluginStart } from '@kbn/unified-search-plugin/public';
import type { SharePublicSetup, SharePublicStart } from '@kbn/share-plugin/public/plugin';
import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-plugin/public';
import { NavigationPublicStart } from '@kbn/navigation-plugin/public/types';
import { FieldsMetadataPublicStart } from '@kbn/fields-metadata-plugin/public';
import {
ObservabilityAIAssistantPublicSetup,
ObservabilityAIAssistantPublicStart,
} from '@kbn/observability-ai-assistant-plugin/public';
import { AppMountParameters } from '@kbn/core/public';
import { LicensingPluginStart } from '@kbn/licensing-plugin/public';
import { IndexManagementPluginStart } from '@kbn/index-management-shared-types';
import { IngestPipelinesPluginStart } from '@kbn/ingest-pipelines-plugin/public';
import { DiscoverStart } from '@kbn/discover-plugin/public';
import {
DiscoverSharedPublicSetup,
DiscoverSharedPublicStart,
} from '@kbn/discover-shared-plugin/public';
/* eslint-disable @typescript-eslint/no-empty-interface*/
import { FieldsMetadataPublicStart } from '@kbn/fields-metadata-plugin/public';
import { IndexManagementPluginStart } from '@kbn/index-management-shared-types';
import { IngestPipelinesPluginStart } from '@kbn/ingest-pipelines-plugin/public';
import { LicensingPluginStart } from '@kbn/licensing-plugin/public';
import { NavigationPublicStart } from '@kbn/navigation-plugin/public/types';
import {
ObservabilityAIAssistantPublicSetup,
ObservabilityAIAssistantPublicStart,
} from '@kbn/observability-ai-assistant-plugin/public';
import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-plugin/public';
import type { SharePublicSetup, SharePublicStart } from '@kbn/share-plugin/public/plugin';
import type { StreamsPluginSetup, StreamsPluginStart } from '@kbn/streams-plugin/public';
import type { UnifiedSearchPublicPluginStart } from '@kbn/unified-search-plugin/public';
/* eslint-disable @typescript-eslint/no-empty-interface*/
export interface ConfigSchema {}
export interface StreamsApplicationProps {
@ -49,13 +51,15 @@ export interface StreamsAppSetupDependencies {
}
export interface StreamsAppStartDependencies {
charts: ChartsPluginStart;
data: DataPublicPluginStart;
dataViews: DataViewsPublicPluginStart;
discover: DiscoverStart;
discoverShared: DiscoverSharedPublicStart;
fieldsMetadata: FieldsMetadataPublicStart;
licensing: LicensingPluginStart;
indexManagement: IndexManagementPluginStart;
ingestPipelines: IngestPipelinesPluginStart;
licensing: LicensingPluginStart;
navigation: NavigationPublicStart;
observabilityAIAssistant: ObservabilityAIAssistantPublicStart;
savedObjectsTagging: SavedObjectTaggingPluginStart;

View file

@ -27,6 +27,8 @@
"@kbn/react-kibana-context-render",
"@kbn/code-editor",
"@kbn/ui-theme",
"@kbn/charts-plugin",
"@kbn/discover-plugin",
"@kbn/kibana-react-plugin",
"@kbn/es-query",
"@kbn/server-route-repository-client",
@ -64,6 +66,6 @@
"@kbn/core-analytics-browser",
"@kbn/index-management-shared-types",
"@kbn/ingest-pipelines-plugin",
"@kbn/deeplinks-observability",
"@kbn/deeplinks-observability"
]
}

View file

@ -135,8 +135,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
after(async () => {
await unloadDashboards();
await unlinkDashboard(SEARCH_DASHBOARD_ID);
await unloadDashboards();
});
it('lists the dashboard in the stream response', async () => {
@ -177,7 +177,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('recovers on write and lists the linked dashboard ', async () => {
await unlinkDashboard(SEARCH_DASHBOARD_ID);
await linkDashboard(SEARCH_DASHBOARD_ID);
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', {

View file

@ -63,6 +63,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
body: {
dashboards: [],
queries: [],
stream: {
ingest: {
lifecycle: { inherit: {} },
@ -98,12 +99,14 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const {
dashboards,
queries,
stream,
effective_lifecycle: effectiveLifecycle,
elasticsearch_assets: elasticsearchAssets,
} = body;
expect(dashboards).to.eql([]);
expect(queries).to.eql([]);
expect(stream).to.eql({
name: TEST_STREAM_NAME,
@ -159,6 +162,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
params: {
path: { name: TEST_STREAM_NAME },
body: {
queries: [],
dashboards: [],
stream: {
ingest: {
@ -255,6 +259,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
name: DATA_STREAM_NAME,
},
body: {
queries: [],
dashboards: [],
stream: {
ingest: {
@ -319,6 +324,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
body: {
dashboards: [],
queries: [],
stream: {
ingest: {
lifecycle: { inherit: {} },

View file

@ -52,6 +52,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('Place processing steps', async () => {
const body: IngestStreamUpsertRequest = {
dashboards: [],
queries: [],
stream: {
ingest: {
lifecycle: { inherit: {} },

View file

@ -359,6 +359,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('should allow to update field type to incompatible type', async () => {
const body: IngestStreamUpsertRequest = {
dashboards: [],
queries: [],
stream: {
ingest: {
lifecycle: { inherit: {} },
@ -401,6 +402,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('should not allow to update field type to system', async () => {
const body: IngestStreamUpsertRequest = {
dashboards: [],
queries: [],
stream: {
ingest: {
lifecycle: { inherit: {} },

View file

@ -46,6 +46,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
},
},
})
@ -65,6 +66,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
},
},
})
@ -84,6 +86,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
},
},
})
@ -102,6 +105,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
},
},
})
@ -120,6 +124,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
},
},
})
@ -152,6 +157,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
});
});
@ -208,6 +214,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
},
},
})
@ -226,6 +233,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
},
},
})

View file

@ -9,7 +9,7 @@ import { StreamUpsertRequest } from '@kbn/streams-schema';
import expect from '@kbn/expect';
import { StreamsSupertestRepositoryClient } from './repository_client';
type StreamPutItem = Omit<StreamUpsertRequest, 'dashboards'> & { name: string };
type StreamPutItem = Omit<StreamUpsertRequest, 'dashboards' | 'queries'> & { name: string };
const streams: StreamPutItem[] = [
{
@ -135,6 +135,7 @@ export async function createStreams(apiClient: StreamsSupertestRepositoryClient)
body: {
...stream,
dashboards: [],
queries: [],
} as StreamUpsertRequest,
path: { name },
},

View file

@ -119,3 +119,18 @@ export async function getIlmStats(
.expect(expectStatusCode)
.then((response) => response.body);
}
export async function getQueries(
apiClient: StreamsSupertestRepositoryClient,
name: string,
expectStatusCode: number = 200
) {
return await apiClient
.fetch('GET /api/streams/{name}/queries 2023-10-31', {
params: {
path: { name },
},
})
.expect(expectStatusCode)
.then((response) => response.body);
}

View file

@ -19,6 +19,8 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
loadTestFile(require.resolve('./root_stream'));
loadTestFile(require.resolve('./group_streams'));
loadTestFile(require.resolve('./lifecycle'));
loadTestFile(require.resolve('./significant_events'));
loadTestFile(require.resolve('./queries'));
loadTestFile(require.resolve('./discover'));
});
}

View file

@ -91,6 +91,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
};
describe('Wired streams update', () => {
@ -99,6 +100,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const response = await putStream(apiClient, 'logs', {
dashboards: [],
queries: [],
stream: {
ingest: {
...(rootDefinition as WiredStreamGetResponse).stream.ingest,
@ -126,6 +128,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
'logs',
{
dashboards: [],
queries: [],
stream: {
ingest: {
...(rootDefinition as WiredStreamGetResponse).stream.ingest,
@ -146,6 +149,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const rootDefinition = await getStream(apiClient, 'logs');
await putStream(apiClient, 'logs', {
dashboards: [],
queries: [],
stream: {
ingest: {
...(rootDefinition as WiredStreamGetResponse).stream.ingest,
@ -155,6 +159,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
await putStream(apiClient, 'logs.overrides', {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -181,6 +186,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('applies the nearest parent lifecycle when deleted', async () => {
await putStream(apiClient, 'logs.10d', {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -190,6 +196,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
await putStream(apiClient, 'logs.10d.20d', {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -202,6 +209,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
// delete lifecycle of the 20d override
await putStream(apiClient, 'logs.10d.20d', {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -222,6 +230,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('handles no retention dsl', async () => {
await putStream(apiClient, 'logs.no', {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -232,6 +241,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await putStream(apiClient, 'logs.no.retention', {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -253,6 +263,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
'logs.ilm',
{
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -268,6 +279,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await putStream(apiClient, 'logs.ilm.stream', wiredPutBody);
await putStream(apiClient, 'logs.ilm', {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -290,6 +302,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const name = 'logs.ilm-with-backing-indices';
await putStream(apiClient, name, {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -303,6 +316,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await putStream(apiClient, name, {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -322,6 +336,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const name = 'logs.dlm-with-backing-indices';
await putStream(apiClient, name, {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -339,6 +354,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await putStream(apiClient, name, {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -366,6 +382,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
},
},
dashboards: [],
queries: [],
};
const createDataStream = async (name: string, lifecycle: IngestStreamLifecycle) => {
@ -416,6 +433,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await putStream(apiClient, indexName, {
dashboards: [],
queries: [],
stream: {
ingest: {
...unwiredPutBody.stream.ingest,
@ -439,6 +457,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
indexName,
{
dashboards: [],
queries: [],
stream: {
ingest: {
...unwiredPutBody.stream.ingest,
@ -459,6 +478,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const indexName = 'logs.dslnostats';
await putStream(apiClient, indexName, {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -478,6 +498,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const indexName = 'logs.ilmpolicydontexists';
await putStream(apiClient, indexName, {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,
@ -508,6 +529,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await putStream(apiClient, indexName, {
dashboards: [],
queries: [],
stream: {
ingest: {
...wiredPutBody.stream.ingest,

View file

@ -0,0 +1,238 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { WiredIngestUpsertRequest } from '@kbn/streams-schema';
import { v4 } from 'uuid';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
} from './helpers/repository_client';
import { disableStreams, enableStreams, getQueries, putStream } from './helpers/requests';
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
let apiClient: StreamsSupertestRepositoryClient;
const STREAM_NAME = 'logs.queries-test';
const stream: WiredIngestUpsertRequest = {
ingest: {
lifecycle: { inherit: {} },
processing: [],
wired: {
routing: [],
fields: {
numberfield: {
type: 'long',
},
},
},
},
};
describe('Queries API', () => {
before(async () => {
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
});
after(async () => {
await disableStreams(apiClient);
});
beforeEach(async () => {
await putStream(apiClient, STREAM_NAME, {
stream,
dashboards: [],
queries: [],
});
});
it('lists empty queries when none are defined on the stream', async () => {
const response = await getQueries(apiClient, STREAM_NAME);
expect(response).to.eql({ queries: [] });
});
it('lists queries when defined on the stream', async () => {
const queries = [
{ id: v4(), title: 'OutOfMemoryError', kql: { query: "message:'OutOfMemoryError'" } },
{
id: v4(),
title: 'cluster_block_exception',
kql: { query: "message:'cluster_block_exception'" },
},
];
const updateStreamResponse = await putStream(apiClient, STREAM_NAME, {
stream,
dashboards: [],
queries,
});
expect(updateStreamResponse).to.have.property('acknowledged', true);
const getQueriesResponse = await getQueries(apiClient, STREAM_NAME);
expect(getQueriesResponse.queries).to.eql(queries);
});
it('inserts a query when inexistant', async () => {
const query = { id: v4(), title: 'Significant Query', kql: { query: "message:'query'" } };
const upsertQueryResponse = await apiClient
.fetch('PUT /api/streams/{name}/queries/{queryId} 2023-10-31', {
params: {
path: { name: STREAM_NAME, queryId: query.id },
body: {
title: query.title,
kql: query.kql,
},
},
})
.expect(200)
.then((res) => res.body);
expect(upsertQueryResponse.acknowledged).to.be(true);
const getQueriesResponse = await getQueries(apiClient, STREAM_NAME);
expect(getQueriesResponse.queries).to.eql([query]);
});
it('updates a query when already defined', async () => {
const queryId = v4();
await putStream(apiClient, STREAM_NAME, {
stream,
dashboards: [],
queries: [
{
id: queryId,
title: 'Significant Query',
kql: { query: "message:'query'" },
},
],
});
const upsertQueryResponse = await apiClient
.fetch('PUT /api/streams/{name}/queries/{queryId} 2023-10-31', {
params: {
path: { name: STREAM_NAME, queryId },
body: {
title: 'Another title',
kql: { query: "message:'Something else'" },
},
},
})
.expect(200)
.then((res) => res.body);
expect(upsertQueryResponse.acknowledged).to.be(true);
const getQueriesResponse = await getQueries(apiClient, STREAM_NAME);
expect(getQueriesResponse.queries).to.eql([
{
id: queryId,
title: 'Another title',
kql: { query: "message:'Something else'" },
},
]);
});
it('deletes an existing query successfully', async () => {
const queryId = v4();
await putStream(apiClient, STREAM_NAME, {
stream,
dashboards: [],
queries: [
{
id: queryId,
title: 'Significant Query',
kql: { query: "message:'query'" },
},
],
});
const deleteQueryResponse = await apiClient
.fetch('DELETE /api/streams/{name}/queries/{queryId} 2023-10-31', {
params: { path: { name: STREAM_NAME, queryId } },
})
.expect(200)
.then((res) => res.body);
expect(deleteQueryResponse.acknowledged).to.be(true);
const getQueriesResponse = await getQueries(apiClient, STREAM_NAME);
expect(getQueriesResponse.queries).to.eql([]);
});
it('throws when deleting an inexistant query', async () => {
const queryId = v4();
await apiClient
.fetch('DELETE /api/streams/{name}/queries/{queryId} 2023-10-31', {
params: { path: { name: STREAM_NAME, queryId } },
})
.expect(404);
const getQueriesResponse = await getQueries(apiClient, STREAM_NAME);
expect(getQueriesResponse.queries).to.eql([]);
});
it('bulks insert and remove queries', async () => {
const existingQueryId = v4();
await putStream(apiClient, STREAM_NAME, {
stream,
dashboards: [],
queries: [
{
id: existingQueryId,
title: 'Significant Query',
kql: { query: "message:'query'" },
},
],
});
const newQuery1 = {
id: v4(),
title: 'query1',
kql: { query: 'irrelevant1' },
};
const newQuery2 = {
id: v4(),
title: 'query2',
kql: { query: 'irrelevant2' },
};
const bulkResponse = await apiClient
.fetch('POST /api/streams/{name}/queries/_bulk 2023-10-31', {
params: {
path: { name: STREAM_NAME },
body: {
operations: [
{
index: newQuery1,
},
{
delete: {
id: 'inexistant',
},
},
{
index: newQuery2,
},
{
delete: {
id: existingQueryId,
},
},
],
},
},
})
.expect(200)
.then((res) => res.body);
expect(bulkResponse).to.have.property('acknowledged', true);
const getQueriesResponse = await getQueries(apiClient, STREAM_NAME);
expect(getQueriesResponse.queries).to.eql([newQuery1, newQuery2]);
});
});
}

View file

@ -60,6 +60,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('Should not allow processing changes', async () => {
const body: IngestStreamUpsertRequest = {
dashboards: [],
queries: [],
stream: {
ingest: {
...rootStreamDefinition.ingest,
@ -87,6 +88,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('Should not allow fields changes', async () => {
const body: IngestStreamUpsertRequest = {
dashboards: [],
queries: [],
stream: {
ingest: {
...rootStreamDefinition.ingest,
@ -109,6 +111,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('Should allow routing changes', async () => {
const body: IngestStreamUpsertRequest = {
dashboards: [],
queries: [],
stream: {
ingest: {
...rootStreamDefinition.ingest,

View file

@ -0,0 +1,133 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import {
IngestStreamLifecycle,
IngestStreamUpsertRequest,
WiredStreamGetResponse,
isDslLifecycle,
isIlmLifecycle,
} from '@kbn/streams-schema';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
} from './helpers/repository_client';
import { disableStreams, enableStreams, getStream, putStream } from './helpers/requests';
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
const esClient = getService('es');
let apiClient: StreamsSupertestRepositoryClient;
describe('Significant Events', () => {
before(async () => {
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
});
after(async () => {
await disableStreams(apiClient);
});
describe('Wired streams update', () => {
it('updates the queries', async () => {
let streamDefinition = await getStream(apiClient, 'logs');
expect(streamDefinition.queries.length).to.eql(0);
const response = await putStream(apiClient, 'logs', {
stream: {
ingest: {
...(streamDefinition as WiredStreamGetResponse).stream.ingest,
},
},
dashboards: [],
queries: [{ id: 'aaa', title: 'OOM Error', kql: { query: "message: 'OOM Error'" } }],
});
expect(response).to.have.property('acknowledged', true);
streamDefinition = await getStream(apiClient, 'logs');
expect(streamDefinition.queries.length).to.eql(1);
expect(streamDefinition.queries[0]).to.eql({
id: 'aaa',
title: 'OOM Error',
kql: { query: "message: 'OOM Error'" },
});
});
});
describe('Unwired streams update', () => {
const unwiredPutBody: IngestStreamUpsertRequest = {
stream: {
ingest: {
lifecycle: { inherit: {} },
processing: [],
unwired: {},
},
},
dashboards: [],
queries: [],
};
const createDataStream = async (name: string, lifecycle: IngestStreamLifecycle) => {
await esClient.indices.putIndexTemplate({
name,
index_patterns: [name],
data_stream: {},
template: isDslLifecycle(lifecycle)
? {
lifecycle: { data_retention: lifecycle.dsl.data_retention },
settings: {
'index.lifecycle.prefer_ilm': false,
'index.default_pipeline': 'logs@default-pipeline',
},
}
: isIlmLifecycle(lifecycle)
? {
settings: {
'index.default_pipeline': 'logs@default-pipeline',
'index.lifecycle.prefer_ilm': true,
'index.lifecycle.name': lifecycle.ilm.policy,
},
}
: undefined,
});
await esClient.indices.createDataStream({ name });
return async () => {
await esClient.indices.deleteDataStream({ name });
await esClient.indices.deleteIndexTemplate({ name });
};
};
it('updates the queries', async () => {
const indexName = 'unwired-stream-queries';
const clean = await createDataStream(indexName, { dsl: { data_retention: '77d' } });
await putStream(apiClient, indexName, unwiredPutBody);
let streamDefinition = await getStream(apiClient, indexName);
expect(streamDefinition.queries.length).to.eql(0);
await putStream(apiClient, indexName, {
...unwiredPutBody,
queries: [{ id: 'aaa', title: 'OOM Error', kql: { query: "message: 'OOM Error'" } }],
});
streamDefinition = await getStream(apiClient, indexName);
expect(streamDefinition.queries.length).to.eql(1);
expect(streamDefinition.queries[0]).to.eql({
id: 'aaa',
title: 'OOM Error',
kql: { query: "message: 'OOM Error'" },
});
await clean();
});
});
});
}