🌊 Streams: Prepare API for publishing (#213127)

Add streams API to documentation as an experimental feature

<img width="2555" alt="Screenshot 2025-03-07 at 11 44 54"
src="https://github.com/user-attachments/assets/f54e5e6e-0c20-4bad-9cff-27747d0f76e2"
/>

There are a couple of changes in here:
* Split streams API in internal and public and mark the public parts as
experimental
* Add the public parts to the Kibana documentation
* Add description and summary
* Adjust the server repository wrapper to pass through summary and
description

# To test

* Generate OAS bundle: `node scripts/capture_oas_snapshot --include-path
/api/streams --update`
* Apply overlays `cd oas_docs && make api-docs`
* Make sure bump.sh is installed (`npm install -g bump-cli`)
* Run for preview: `cd oas_docs && bump preview output/kibana.yaml`

# Open questions

* Does the split into public and internal make sense?
* Is it a problem if this is visible in the user-facing documentation
page before we actually release streams? Or would it be OK if the API is
marked as experimental? (mostly a question for @LucaWintergerst )

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Joe Reuter 2025-03-13 14:41:05 +01:00 committed by GitHub
parent d0aecedbb3
commit 4681b6c562
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
56 changed files with 11621 additions and 326 deletions

View file

@ -8,7 +8,7 @@ source .buildkite/scripts/common/util.sh
.buildkite/scripts/copy_es_snapshot_cache.sh
echo --- Capture OAS snapshot
cmd="node scripts/capture_oas_snapshot --include-path /api/status --include-path /api/alerting/rule/ --include-path /api/alerting/rules --include-path /api/actions --include-path /api/security/role --include-path /api/spaces --include-path /api/fleet --include-path /api/dashboards"
cmd="node scripts/capture_oas_snapshot --include-path /api/status --include-path /api/alerting/rule/ --include-path /api/alerting/rules --include-path /api/actions --include-path /api/security/role --include-path /api/spaces --include-path /api/streams --include-path /api/fleet --include-path /api/dashboards"
if is_pr && ! is_auto_commit_disabled; then
cmd="$cmd --update"
fi

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -32,6 +32,12 @@ actions:
description: Alerting documentation
url: https://www.elastic.co/docs/current/serverless/rules
x-displayName: "Alerting"
- target: '$.tags[?(@.name=="streams")]'
description: Change tag description and displayName
update:
description: >
Streams is a new and experimental way to manage your data in Kibana. The API is currently not considered stable and can change at any point.
x-displayName: "Streams"
- target: '$.tags[?(@.name=="connectors")]'
description: Change tag description and displayName
update:

View file

@ -47,6 +47,12 @@ actions:
description: Alerting documentation
url: https://www.elastic.co/guide/en/kibana/master/alerting-getting-started.html
x-displayName: "Alerting"
- target: '$.tags[?(@.name=="streams")]'
description: Change tag description and displayName
update:
description: >
Streams is a new and experimental way to manage your data in Kibana (currently experimental - expect changes).
x-displayName: "Streams"
- target: '$.tags[?(@.name=="cases")]'
description: Change tag description and displayName
update:

View file

@ -191,6 +191,8 @@ export function registerRoutes<TDependencies extends Record<string, any>>({
router.versioned[method]({
path: pathname,
access,
summary: options.summary,
description: options.description,
// @ts-expect-error we are essentially calling multiple methods at the same type so TS gets confused
options: omit(options, 'access', 'description', 'summary', 'deprecated', 'discontinued'),
security,

View file

@ -20,6 +20,10 @@ export const createServerRoute: CreateServerRouteFactory<
> = ({ handler, ...config }) => {
return createPlainStreamsServerRoute({
...config,
options: {
...config.options,
tags: [...(config.options?.tags ?? []), 'oas-tag:streams'],
},
handler: (options) => {
return handler(options).catch((error) => {
if (error instanceof StatusError || error instanceof errors.ResponseError) {

View file

@ -48,9 +48,15 @@ function sanitizeDashboardAsset(asset: DashboardAsset): SanitizedDashboardAsset
}
const listDashboardsRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/dashboards',
endpoint: 'GET /api/streams/{name}/dashboards 2023-10-31',
options: {
access: 'internal',
access: 'public',
summary: 'Get stream dashboards',
description:
'Fetches all dashboards linked to a stream that are visible to the current user in the current space.',
availability: {
stability: 'experimental',
},
},
params: z.object({
path: z.object({
@ -90,9 +96,15 @@ const listDashboardsRoute = createServerRoute({
});
const linkDashboardRoute = createServerRoute({
endpoint: 'PUT /api/streams/{name}/dashboards/{dashboardId}',
endpoint: 'PUT /api/streams/{name}/dashboards/{dashboardId} 2023-10-31',
options: {
access: 'internal',
access: 'public',
summary: 'Link a dashboard to a stream',
description:
'Links a dashboard to a stream. Noop if the dashboard is already linked to the stream.',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -131,9 +143,15 @@ const linkDashboardRoute = createServerRoute({
});
const unlinkDashboardRoute = createServerRoute({
endpoint: 'DELETE /api/streams/{name}/dashboards/{dashboardId}',
endpoint: 'DELETE /api/streams/{name}/dashboards/{dashboardId} 2023-10-31',
options: {
access: 'internal',
access: 'public',
summary: 'Unlink a dashboard from a stream',
description:
'Unlinks a dashboard from a stream. Noop if the dashboard is not linked to the stream.',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -171,7 +189,7 @@ const unlinkDashboardRoute = createServerRoute({
});
const suggestDashboardsRoute = createServerRoute({
endpoint: 'POST /api/streams/{name}/dashboards/_suggestions',
endpoint: 'POST /internal/streams/{name}/dashboards/_suggestions',
options: {
access: 'internal',
},
@ -224,9 +242,15 @@ const dashboardSchema = z.object({
});
const bulkDashboardsRoute = createServerRoute({
endpoint: `POST /api/streams/{name}/dashboards/_bulk`,
endpoint: `POST /api/streams/{name}/dashboards/_bulk 2023-10-31`,
options: {
access: 'internal',
access: 'public',
summary: 'Bulk update dashboards',
description:
'Bulk update dashboards linked to a stream. Can link new dashboards and delete existing ones.',
availability: {
stability: 'experimental',
},
},
security: {
authz: {

View file

@ -5,27 +5,35 @@
* 2.0.
*/
import { esqlRoutes } from './esql/route';
import { internalEsqlRoutes } from './internal/esql/route';
import { dashboardRoutes } from './dashboards/route';
import { crudRoutes } from './streams/crud/route';
import { enablementRoutes } from './streams/enablement/route';
import { managementRoutes } from './streams/management/route';
import { schemaRoutes } from './streams/schema/route';
import { processingRoutes } from './streams/processing/route';
import { internalSchemaRoutes } from './internal/streams/schema/route';
import { internalProcessingRoutes } from './internal/streams/processing/route';
import { ingestRoutes } from './streams/ingest/route';
import { lifecycleRoutes } from './streams/lifecycle/route';
import { internalLifecycleRoutes } from './internal/streams/lifecycle/route';
import { groupRoutes } from './streams/group/route';
import { internalDashboardRoutes } from './internal/dashboards/route';
import { internalCrudRoutes } from './internal/streams/crud/route';
import { internalManagementRoutes } from './internal/streams/management/route';
export const streamsRouteRepository = {
...esqlRoutes,
// internal APIs
...internalEsqlRoutes,
...internalDashboardRoutes,
...internalCrudRoutes,
...internalManagementRoutes,
...internalSchemaRoutes,
...internalLifecycleRoutes,
...internalProcessingRoutes,
// public APIs
...dashboardRoutes,
...crudRoutes,
...enablementRoutes,
...managementRoutes,
...schemaRoutes,
...processingRoutes,
...ingestRoutes,
...lifecycleRoutes,
...groupRoutes,
};

View file

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { DashboardAsset } from '../../../../common/assets';
import { createServerRoute } from '../../create_server_route';
import { SanitizedDashboardAsset } from '../../dashboards/route';
export interface SuggestDashboardResponse {
suggestions: SanitizedDashboardAsset[];
}
function sanitizeDashboardAsset(asset: DashboardAsset): SanitizedDashboardAsset {
return {
id: asset.assetId,
label: asset.label,
tags: asset.tags,
};
}
const suggestDashboardsRoute = createServerRoute({
endpoint: 'POST /internal/streams/{name}/dashboards/_suggestions',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
name: z.string(),
}),
query: z.object({
query: z.string(),
}),
body: z.object({
tags: z.optional(z.array(z.string())),
}),
}),
handler: async ({ params, request, getScopedClients }): Promise<SuggestDashboardResponse> => {
const { assetClient, streamsClient } = await getScopedClients({ request });
await streamsClient.ensureStream(params.path.name);
const {
query: { query },
body: { tags },
} = params;
const suggestions = (
await assetClient.getSuggestions({
assetTypes: ['dashboard'],
query,
tags,
})
).assets.map((asset) => {
return sanitizeDashboardAsset(asset as DashboardAsset);
});
return {
suggestions,
};
},
});
export const internalDashboardRoutes = {
...suggestDashboardsRoute,
};

View file

@ -8,7 +8,7 @@
import { UnparsedEsqlResponse, createTracedEsClient } from '@kbn/traced-es-client';
import { z } from '@kbn/zod';
import { isNumber } from 'lodash';
import { createServerRoute } from '../create_server_route';
import { createServerRoute } from '../../create_server_route';
import { excludeFrozenQuery, kqlQuery, rangeQuery } from './query_helpers';
export const executeEsqlRoute = createServerRoute({
@ -67,6 +67,6 @@ export const executeEsqlRoute = createServerRoute({
},
});
export const esqlRoutes = {
export const internalEsqlRoutes = {
...executeEsqlRoute,
};

View file

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import { isGroupStreamDefinition } from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { createServerRoute } from '../../../create_server_route';
export interface StreamDetailsResponse {
details: {
count: number;
};
}
export const streamDetailRoute = createServerRoute({
endpoint: 'GET /internal/streams/{name}/_details',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ name: z.string() }),
query: z.object({
start: z.string(),
end: z.string(),
}),
}),
handler: async ({ params, request, getScopedClients }): Promise<StreamDetailsResponse> => {
const { scopedClusterClient, streamsClient } = await getScopedClients({ request });
const streamEntity = await streamsClient.getStream(params.path.name);
const indexPattern = isGroupStreamDefinition(streamEntity)
? streamEntity.group.members.join(',')
: streamEntity.name;
// check doc count
const docCountResponse = await scopedClusterClient.asCurrentUser.search({
index: indexPattern,
track_total_hits: true,
query: {
range: {
'@timestamp': {
gte: params.query.start,
lte: params.query.end,
},
},
},
size: 0,
});
const count = (docCountResponse.hits.total as SearchTotalHits).value;
return {
details: {
count,
},
};
},
});
export const internalCrudRoutes = {
...streamDetailRoute,
};

View file

@ -7,13 +7,13 @@
import { isIlmLifecycle, isIngestStreamDefinition } from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { createServerRoute } from '../../create_server_route';
import { ilmPhases } from '../../../lib/streams/lifecycle/ilm_phases';
import { getEffectiveLifecycle } from '../../../lib/streams/lifecycle/get_effective_lifecycle';
import { StatusError } from '../../../lib/streams/errors/status_error';
import { createServerRoute } from '../../../create_server_route';
import { ilmPhases } from '../../../../lib/streams/lifecycle/ilm_phases';
import { getEffectiveLifecycle } from '../../../../lib/streams/lifecycle/get_effective_lifecycle';
import { StatusError } from '../../../../lib/streams/errors/status_error';
const lifecycleStatsRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/lifecycle/_stats',
endpoint: 'GET /internal/streams/{name}/lifecycle/_stats',
options: {
access: 'internal',
},
@ -56,7 +56,7 @@ const lifecycleStatsRoute = createServerRoute({
});
const lifecycleIlmExplainRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/lifecycle/_explain',
endpoint: 'GET /internal/streams/{name}/lifecycle/_explain',
options: {
access: 'internal',
},
@ -83,7 +83,7 @@ const lifecycleIlmExplainRoute = createServerRoute({
},
});
export const lifecycleRoutes = {
export const internalLifecycleRoutes = {
...lifecycleStatsRoute,
...lifecycleIlmExplainRoute,
};

View file

@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
SampleDocument,
conditionSchema,
conditionToQueryDsl,
getFields,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { checkAccess } from '../../../../lib/streams/stream_crud';
import { createServerRoute } from '../../../create_server_route';
import { DefinitionNotFoundError } from '../../../../lib/streams/errors/definition_not_found_error';
export const sampleStreamRoute = createServerRoute({
endpoint: 'POST /internal/streams/{name}/_sample',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ name: z.string() }),
body: z.object({
if: z.optional(conditionSchema),
start: z.optional(z.number()),
end: z.optional(z.number()),
size: z.optional(z.number()),
}),
}),
handler: async ({ params, request, getScopedClients }) => {
const { scopedClusterClient } = await getScopedClients({ request });
const { read } = await checkAccess({ name: params.path.name, scopedClusterClient });
if (!read) {
throw new DefinitionNotFoundError(`Stream definition for ${params.path.name} not found`);
}
const { if: condition, start, end, size } = params.body;
const searchBody = {
query: {
bool: {
must: [
condition ? conditionToQueryDsl(condition) : { match_all: {} },
{
range: {
'@timestamp': {
gte: start,
lte: end,
format: 'epoch_millis',
},
},
},
],
},
},
// Conditions could be using fields which are not indexed or they could use it with other types than they are eventually mapped as.
// Because of this we can't rely on mapped fields to draw a sample, instead we need to use runtime fields to simulate what happens during
// ingest in the painless condition checks.
// This is less efficient than it could be - in some cases, these fields _are_ indexed with the right type and we could use them directly.
// This can be optimized in the future.
runtime_mappings: condition
? Object.fromEntries(
getFields(condition).map((field) => [
field.name,
{ type: field.type === 'string' ? ('keyword' as const) : ('double' as const) },
])
)
: undefined,
sort: [
{
'@timestamp': {
order: 'desc' as const,
},
},
],
terminate_after: size,
track_total_hits: false,
size,
};
const results = await scopedClusterClient.asCurrentUser.search({
index: params.path.name,
allow_no_indices: true,
...searchBody,
});
return { documents: results.hits.hits.map((hit) => hit._source) as SampleDocument[] };
},
});
export const internalManagementRoutes = {
...sampleStreamRoute,
};

View file

@ -12,9 +12,9 @@ import {
processorWithIdDefinitionSchema,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error';
import { checkAccess } from '../../../../lib/streams/stream_crud';
import { createServerRoute } from '../../../create_server_route';
import { DefinitionNotFoundError } from '../../../../lib/streams/errors/definition_not_found_error';
import { ProcessingSimulationParams, simulateProcessing } from './simulation_handler';
import { handleProcessingSuggestion } from './suggestions_handler';
@ -28,7 +28,7 @@ const paramsSchema = z.object({
}) satisfies z.Schema<ProcessingSimulationParams>;
export const simulateProcessorRoute = createServerRoute({
endpoint: 'POST /api/streams/{name}/processing/_simulate',
endpoint: 'POST /internal/streams/{name}/processing/_simulate',
options: {
access: 'internal',
},
@ -70,7 +70,7 @@ const suggestionsParamsSchema = z.object({
});
export const processingSuggestionRoute = createServerRoute({
endpoint: 'POST /api/streams/{name}/processing/_suggestions',
endpoint: 'POST /internal/streams/{name}/processing/_suggestions',
options: {
access: 'internal',
},
@ -96,7 +96,7 @@ export const processingSuggestionRoute = createServerRoute({
},
});
export const processingRoutes = {
export const internalProcessingRoutes = {
...simulateProcessorRoute,
...processingSuggestionRoute,
};

View file

@ -33,9 +33,9 @@ import {
FieldDefinition,
} from '@kbn/streams-schema';
import { mapValues, uniq, omit, isEmpty, uniqBy, some } from 'lodash';
import { StreamsClient } from '../../../lib/streams/client';
import { DetectedMappingFailureError } from '../../../lib/streams/errors/detected_mapping_failure_error';
import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing';
import { StreamsClient } from '../../../../lib/streams/client';
import { DetectedMappingFailureError } from '../../../../lib/streams/errors/detected_mapping_failure_error';
import { formatToIngestProcessors } from '../../../../lib/streams/helpers/processing';
export interface ProcessingSimulationParams {
path: {

View file

@ -9,7 +9,7 @@ import { handleProcessingSuggestion, extractAndGroupPatterns } from './suggestio
import { simulateProcessing } from './simulation_handler';
import { InferenceClient } from '@kbn/inference-plugin/server';
import { ScopedClusterClient } from '@kbn/core-elasticsearch-client-server-internal';
import { StreamsClient } from '../../../lib/streams/client';
import { StreamsClient } from '../../../../lib/streams/client';
jest.mock('./simulation_handler', () => ({
simulateProcessing: jest.fn((params) =>

View file

@ -9,7 +9,7 @@ import { IScopedClusterClient } from '@kbn/core/server';
import { get, groupBy, mapValues, orderBy, shuffle, uniq, uniqBy } from 'lodash';
import { InferenceClient } from '@kbn/inference-plugin/server';
import { FlattenRecord } from '@kbn/streams-schema';
import { StreamsClient } from '../../../lib/streams/client';
import { StreamsClient } from '../../../../lib/streams/client';
import { simulateProcessing } from './simulation_handler';
import { ProcessingSuggestionBody } from './route';

View file

@ -11,14 +11,14 @@ import {
fieldDefinitionConfigSchema,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error';
import { checkAccess } from '../../../../lib/streams/stream_crud';
import { createServerRoute } from '../../../create_server_route';
import { DefinitionNotFoundError } from '../../../../lib/streams/errors/definition_not_found_error';
const UNMAPPED_SAMPLE_SIZE = 500;
export const unmappedFieldsRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/schema/unmapped_fields',
endpoint: 'GET /internal/streams/{name}/schema/unmapped_fields',
options: {
access: 'internal',
},
@ -85,7 +85,7 @@ export const unmappedFieldsRoute = createServerRoute({
const FIELD_SIMILATION_SAMPLE_SIZE = 200;
export const schemaFieldsSimulationRoute = createServerRoute({
endpoint: 'POST /api/streams/{name}/schema/fields_simulation',
endpoint: 'POST /internal/streams/{name}/schema/fields_simulation',
options: {
access: 'internal',
},
@ -285,7 +285,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({
},
});
export const schemaRoutes = {
export const internalSchemaRoutes = {
...unmappedFieldsRoute,
...schemaFieldsSimulationRoute,
};

View file

@ -5,7 +5,6 @@
* 2.0.
*/
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import {
isGroupStreamDefinition,
StreamDefinition,
@ -23,9 +22,14 @@ import { createServerRoute } from '../../create_server_route';
import { readStream } from './read_stream';
export const readStreamRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}',
endpoint: 'GET /api/streams/{name} 2023-10-31',
options: {
access: 'internal',
access: 'public',
summary: 'Get a stream',
description: 'Fetches a stream definition and associated dashboards',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -53,67 +57,15 @@ export const readStreamRoute = createServerRoute({
},
});
export interface StreamDetailsResponse {
details: {
count: number;
};
}
export const streamDetailRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/_details',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ name: z.string() }),
query: z.object({
start: z.string(),
end: z.string(),
}),
}),
handler: async ({ params, request, getScopedClients }): Promise<StreamDetailsResponse> => {
const { scopedClusterClient, streamsClient } = await getScopedClients({ request });
const streamEntity = await streamsClient.getStream(params.path.name);
const indexPattern = isGroupStreamDefinition(streamEntity)
? streamEntity.group.members.join(',')
: streamEntity.name;
// check doc count
const docCountResponse = await scopedClusterClient.asCurrentUser.search({
index: indexPattern,
track_total_hits: true,
query: {
range: {
'@timestamp': {
gte: params.query.start,
lte: params.query.end,
},
},
},
size: 0,
});
const count = (docCountResponse.hits.total as SearchTotalHits).value;
return {
details: {
count,
},
};
},
});
export const listStreamsRoute = createServerRoute({
endpoint: 'GET /api/streams',
endpoint: 'GET /api/streams 2023-10-31',
options: {
access: 'internal',
access: 'public',
description: 'Fetches list of all streams',
summary: 'Get stream list',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -132,9 +84,15 @@ export const listStreamsRoute = createServerRoute({
});
export const editStreamRoute = createServerRoute({
endpoint: 'PUT /api/streams/{name}',
endpoint: 'PUT /api/streams/{name} 2023-10-31',
options: {
access: 'internal',
access: 'public',
summary: 'Create or update a stream',
description:
'Creates or updates a stream definition. Classic streams can not be created through this API, only updated',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -185,9 +143,14 @@ export const editStreamRoute = createServerRoute({
});
export const deleteStreamRoute = createServerRoute({
endpoint: 'DELETE /api/streams/{name}',
endpoint: 'DELETE /api/streams/{name} 2023-10-31',
options: {
access: 'internal',
access: 'public',
summary: 'Delete a stream',
description: 'Deletes a stream definition and the underlying data stream',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -212,7 +175,6 @@ export const deleteStreamRoute = createServerRoute({
export const crudRoutes = {
...readStreamRoute,
...streamDetailRoute,
...listStreamsRoute,
...editStreamRoute,
...deleteStreamRoute,

View file

@ -12,10 +12,15 @@ import { DisableStreamsResponse, EnableStreamsResponse } from '../../../lib/stre
import { createServerRoute } from '../../create_server_route';
export const enableStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_enable',
endpoint: 'POST /api/streams/_enable 2023-10-31',
params: z.object({}),
options: {
access: 'internal',
access: 'public',
summary: 'Enable streams',
description: 'Enables wired streams',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -42,14 +47,22 @@ export const enableStreamsRoute = createServerRoute({
});
export const disableStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_disable',
endpoint: 'POST /api/streams/_disable 2023-10-31',
params: z.object({}),
options: {
access: 'internal',
access: 'public',
summary: 'Disable streams',
description:
'Disables wired streams and deletes all existing stream definitions. The data of wired streams is deleted, but the data of classic streams is preserved.',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
requiredPrivileges: ['streams_write'],
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
handler: async ({ request, getScopedClients }): Promise<DisableStreamsResponse> => {

View file

@ -16,9 +16,14 @@ import {
import { createServerRoute } from '../../create_server_route';
const readGroupRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/_group',
endpoint: 'GET /api/streams/{name}/_group 2023-10-31',
options: {
access: 'internal',
access: 'public',
summary: 'Get group stream settings',
description: 'Fetches the group settings of a group stream definition',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -48,9 +53,14 @@ const readGroupRoute = createServerRoute({
});
const upsertGroupRoute = createServerRoute({
endpoint: 'PUT /api/streams/{name}/_group',
endpoint: 'PUT /api/streams/{name}/_group 2023-10-31',
options: {
access: 'internal',
access: 'public',
description: 'Upserts the group settings of a group stream definition',
summary: 'Upsert group stream settings',
availability: {
stability: 'experimental',
},
},
security: {
authz: {

View file

@ -17,9 +17,14 @@ import { z } from '@kbn/zod';
import { createServerRoute } from '../../create_server_route';
const readIngestRoute = createServerRoute({
endpoint: 'GET /api/streams/{name}/_ingest',
endpoint: 'GET /api/streams/{name}/_ingest 2023-10-31',
options: {
access: 'internal',
access: 'public',
summary: 'Get ingest stream settings',
description: 'Fetches the ingest settings of an ingest stream definition',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -53,9 +58,14 @@ const readIngestRoute = createServerRoute({
});
const upsertIngestRoute = createServerRoute({
endpoint: 'PUT /api/streams/{name}/_ingest',
endpoint: 'PUT /api/streams/{name}/_ingest 2023-10-31',
options: {
access: 'internal',
access: 'public',
summary: 'Update ingest stream settings',
description: 'Upserts the ingest settings of an ingest stream definition',
availability: {
stability: 'experimental',
},
},
security: {
authz: {

View file

@ -5,22 +5,20 @@
* 2.0.
*/
import {
SampleDocument,
conditionSchema,
conditionToQueryDsl,
getFields,
} from '@kbn/streams-schema';
import { conditionSchema } from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { ResyncStreamsResponse } from '../../../lib/streams/client';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error';
export const forkStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/{name}/_fork',
endpoint: 'POST /api/streams/{name}/_fork 2023-10-31',
options: {
access: 'internal',
access: 'public',
description: 'Forks a wired stream and creates a child stream',
summary: 'Fork a stream',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -49,9 +47,14 @@ export const forkStreamsRoute = createServerRoute({
});
export const resyncStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_resync',
endpoint: 'POST /api/streams/_resync 2023-10-31',
options: {
access: 'internal',
access: 'public',
description: 'Resyncs all streams, making sure that Elasticsearch assets are up to date',
summary: 'Resync streams',
availability: {
stability: 'experimental',
},
},
security: {
authz: {
@ -75,7 +78,9 @@ export const getStreamsStatusRoute = createServerRoute({
},
security: {
authz: {
requiredPrivileges: ['streams_read'],
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
handler: async ({ request, getScopedClients }): Promise<{ enabled: boolean }> => {
@ -87,91 +92,8 @@ export const getStreamsStatusRoute = createServerRoute({
},
});
export const sampleStreamRoute = createServerRoute({
endpoint: 'POST /api/streams/{name}/_sample',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ name: z.string() }),
body: z.object({
if: z.optional(conditionSchema),
start: z.optional(z.number()),
end: z.optional(z.number()),
size: z.optional(z.number()),
}),
}),
handler: async ({ params, request, getScopedClients }) => {
const { scopedClusterClient } = await getScopedClients({ request });
const { read } = await checkAccess({ name: params.path.name, scopedClusterClient });
if (!read) {
throw new DefinitionNotFoundError(`Stream definition for ${params.path.name} not found`);
}
const { if: condition, start, end, size } = params.body;
const searchBody = {
query: {
bool: {
must: [
condition ? conditionToQueryDsl(condition) : { match_all: {} },
{
range: {
'@timestamp': {
gte: start,
lte: end,
format: 'epoch_millis',
},
},
},
],
},
},
// Conditions could be using fields which are not indexed or they could use it with other types than they are eventually mapped as.
// Because of this we can't rely on mapped fields to draw a sample, instead we need to use runtime fields to simulate what happens during
// ingest in the painless condition checks.
// This is less efficient than it could be - in some cases, these fields _are_ indexed with the right type and we could use them directly.
// This can be optimized in the future.
runtime_mappings: condition
? Object.fromEntries(
getFields(condition).map((field) => [
field.name,
{ type: field.type === 'string' ? ('keyword' as const) : ('double' as const) },
])
)
: undefined,
sort: [
{
'@timestamp': {
order: 'desc' as const,
},
},
],
terminate_after: size,
track_total_hits: false,
size,
};
const results = await scopedClusterClient.asCurrentUser.search({
index: params.path.name,
allow_no_indices: true,
...searchBody,
});
return { documents: results.hits.hits.map((hit) => hit._source) as SampleDocument[] };
},
});
export const managementRoutes = {
...forkStreamsRoute,
...resyncStreamsRoute,
...getStreamsStatusRoute,
...sampleStreamRoute,
};

View file

@ -42,19 +42,22 @@ const SamplePreviewTableContent = ({
const { value, loading, error } = useStreamsAppFetch(
({ signal }) => {
return streamsRepositoryClient.fetch('POST /api/streams/{name}/schema/fields_simulation', {
signal,
params: {
path: {
name: stream.name,
return streamsRepositoryClient.fetch(
'POST /internal/streams/{name}/schema/fields_simulation',
{
signal,
params: {
path: {
name: stream.name,
},
body: {
field_definitions: [
{ ...convertToFieldDefinitionConfig(nextField), name: nextField.name },
],
},
},
body: {
field_definitions: [
{ ...convertToFieldDefinitionConfig(nextField), name: nextField.name },
],
},
},
});
}
);
},
[stream.name, nextField, streamsRepositoryClient],
{ disableToastOnError: true }

View file

@ -46,7 +46,7 @@ export const useSchemaFields = ({
refresh: refreshUnmappedFields,
} = useStreamsAppFetch(
({ signal }) => {
return streamsRepositoryClient.fetch('GET /api/streams/{name}/schema/unmapped_fields', {
return streamsRepositoryClient.fetch('GET /internal/streams/{name}/schema/unmapped_fields', {
signal,
params: {
path: {
@ -110,7 +110,7 @@ export const useSchemaFields = ({
throw new Error('The field is not different, hence updating is not necessary.');
}
await streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest`, {
await streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest 2023-10-31`, {
signal: abortController.signal,
params: {
path: {
@ -162,7 +162,7 @@ export const useSchemaFields = ({
throw new Error('The field is not mapped, hence it cannot be unmapped.');
}
await streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest`, {
await streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest 2023-10-31`, {
signal: abortController.signal,
params: {
path: {

View file

@ -184,7 +184,7 @@ function InnerGrokAiSuggestions({
setSuggestionsError(undefined);
setSuggestions(undefined);
streamsRepositoryClient
.fetch('POST /api/streams/{name}/processing/_suggestions', {
.fetch('POST /internal/streams/{name}/processing/_suggestions', {
signal: abortController.signal,
params: {
path: { name: definition.stream.name },

View file

@ -23,18 +23,21 @@ export function createSamplesFetchActor({
streamsRepositoryClient,
}: Pick<SimulationMachineDeps, 'streamsRepositoryClient'>) {
return fromPromise<FlattenRecord[], SamplesFetchInput>(async ({ input, signal }) => {
const samplesBody = await streamsRepositoryClient.fetch('POST /api/streams/{name}/_sample', {
signal,
params: {
path: { name: input.streamName },
body: {
if: input.condition,
start: input.absoluteTimeRange.start,
end: input.absoluteTimeRange.end,
size: 100,
const samplesBody = await streamsRepositoryClient.fetch(
'POST /internal/streams/{name}/_sample',
{
signal,
params: {
path: { name: input.streamName },
body: {
if: input.condition,
start: input.absoluteTimeRange.start,
end: input.absoluteTimeRange.end,
size: 100,
},
},
},
});
}
);
return samplesBody.documents.map(flattenObjectNestedLast) as FlattenRecord[];
});

View file

@ -23,7 +23,7 @@ export function createSimulationRunnerActor({
streamsRepositoryClient,
}: Pick<SimulationMachineDeps, 'streamsRepositoryClient'>) {
return fromPromise<Simulation, SimulationRunnerInput>(({ input, signal }) =>
streamsRepositoryClient.fetch('POST /api/streams/{name}/processing/_simulate', {
streamsRepositoryClient.fetch('POST /internal/streams/{name}/processing/_simulate', {
signal,
params: {
path: { name: input.streamName },

View file

@ -16,7 +16,7 @@ import {
import { ProcessorDefinitionWithUIAttributes } from '../../types';
import { PreviewDocsFilterOption } from './preview_docs_filter';
export type Simulation = APIReturnType<'POST /api/streams/{name}/processing/_simulate'>;
export type Simulation = APIReturnType<'POST /internal/streams/{name}/processing/_simulate'>;
export interface SimulationMachineDeps {
data: DataPublicPluginStart;

View file

@ -19,7 +19,7 @@ import { StreamEnrichmentServiceDependencies } from './types';
import { processorConverter } from '../../utils';
import { ProcessorDefinitionWithUIAttributes } from '../../types';
export type UpsertStreamResponse = APIReturnType<'PUT /api/streams/{name}/_ingest'>;
export type UpsertStreamResponse = APIReturnType<'PUT /api/streams/{name}/_ingest 2023-10-31'>;
export interface UpsertStreamInput {
definition: IngestStreamGetResponse;
@ -31,7 +31,7 @@ export function createUpsertStreamActor({
streamsRepositoryClient,
}: Pick<StreamEnrichmentServiceDependencies, 'streamsRepositoryClient'>) {
return fromPromise<UpsertStreamResponse, UpsertStreamInput>(({ input, signal }) => {
return streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest`, {
return streamsRepositoryClient.fetch(`PUT /api/streams/{name}/_ingest 2023-10-31`, {
signal,
params: {
path: {

View file

@ -211,7 +211,7 @@ export const useIngestionRatePerTier = ({
}
const ilmExplain = await streamsRepositoryClient.fetch(
'GET /api/streams/{name}/lifecycle/_explain',
'GET /internal/streams/{name}/lifecycle/_explain',
{
params: { path: { name: definition.stream.name } },
signal,

View file

@ -57,7 +57,7 @@ export function IlmSummary({
({ signal }) => {
if (!definition) return;
return streamsRepositoryClient.fetch('GET /api/streams/{name}/lifecycle/_stats', {
return streamsRepositoryClient.fetch('GET /internal/streams/{name}/lifecycle/_stats', {
params: { path: { name: definition.stream.name } },
signal,
});

View file

@ -146,7 +146,7 @@ export function StreamDetailLifecycle({
},
} as IngestUpsertRequest;
await streamsRepositoryClient.fetch('PUT /api/streams/{name}/_ingest', {
await streamsRepositoryClient.fetch('PUT /api/streams/{name}/_ingest 2023-10-31', {
params: {
path: { name: definition.stream.name },
body: request,

View file

@ -57,7 +57,7 @@ export function ControlBar({
return;
}
return streamsRepositoryClient.fetch('POST /api/streams/{name}/_fork', {
return streamsRepositoryClient.fetch('POST /api/streams/{name}/_fork 2023-10-31', {
signal,
params: {
path: {
@ -96,7 +96,7 @@ export function ControlBar({
},
} as IngestUpsertRequest;
return streamsRepositoryClient.fetch('PUT /api/streams/{name}/_ingest', {
return streamsRepositoryClient.fetch('PUT /api/streams/{name}/_ingest 2023-10-31', {
signal,
params: {
path: {

View file

@ -44,7 +44,7 @@ export function StreamDetailRouting({
const streamsListFetch = useStreamsAppFetch(
({ signal }) => {
return streamsRepositoryClient.fetch('GET /api/streams', {
return streamsRepositoryClient.fetch('GET /api/streams 2023-10-31', {
signal,
});
},

View file

@ -117,7 +117,7 @@ export function StreamDeleteModal({
onClick={async () => {
try {
setDeleteInProgress(true);
await streamsRepositoryClient.fetch('DELETE /api/streams/{name}', {
await streamsRepositoryClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
signal: abortController.signal,
params: {
path: {

View file

@ -65,7 +65,7 @@ export function AddDashboardFlyout({
const dashboardSuggestionsFetch = useStreamsAppFetch(
({ signal }) => {
return streamsRepositoryClient
.fetch('POST /api/streams/{name}/dashboards/_suggestions', {
.fetch('POST /internal/streams/{name}/dashboards/_suggestions', {
signal,
params: {
path: {

View file

@ -137,7 +137,7 @@ export function StreamDetailOverview({ definition }: { definition?: IngestStream
) {
return undefined;
}
return streamsRepositoryClient.fetch('GET /api/streams/{name}/_details', {
return streamsRepositoryClient.fetch('GET /internal/streams/{name}/_details', {
signal,
params: {
path: {

View file

@ -27,7 +27,7 @@ export function StreamListView() {
const streamsListFetch = useStreamsAppFetch(
({ signal }) => {
return streamsRepositoryClient.fetch('GET /api/streams', {
return streamsRepositoryClient.fetch('GET /api/streams 2023-10-31', {
signal,
});
},

View file

@ -25,7 +25,7 @@ export const useDashboardsApi = (name?: string) => {
return;
}
await streamsRepositoryClient.fetch('POST /api/streams/{name}/dashboards/_bulk', {
await streamsRepositoryClient.fetch('POST /api/streams/{name}/dashboards/_bulk 2023-10-31', {
signal,
params: {
path: {
@ -47,7 +47,7 @@ export const useDashboardsApi = (name?: string) => {
if (!name) {
return;
}
await streamsRepositoryClient.fetch('POST /api/streams/{name}/dashboards/_bulk', {
await streamsRepositoryClient.fetch('POST /api/streams/{name}/dashboards/_bulk 2023-10-31', {
signal,
params: {
path: {

View file

@ -21,7 +21,7 @@ export const useDashboardsFetch = (name?: string) => {
if (!name) {
return Promise.resolve(undefined);
}
return streamsRepositoryClient.fetch('GET /api/streams/{name}/dashboards', {
return streamsRepositoryClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', {
signal,
params: {
path: {

View file

@ -39,7 +39,7 @@ export function StreamDetailContextProvider({
} = useStreamsAppFetch(
async ({ signal }) => {
return streamsRepositoryClient
.fetch('GET /api/streams/{name}', {
.fetch('GET /api/streams/{name} 2023-10-31', {
signal,
params: {
path: {

View file

@ -19,7 +19,7 @@ export const useWiredStreams = () => {
} = useKibana();
const result = useStreamsAppFetch(
async ({ signal }) => streamsRepositoryClient.fetch('GET /api/streams', { signal }),
async ({ signal }) => streamsRepositoryClient.fetch('GET /api/streams 2023-10-31', { signal }),
[streamsRepositoryClient]
);

View file

@ -53,23 +53,29 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
}
async function linkDashboard(id: string) {
const response = await apiClient.fetch('PUT /api/streams/{name}/dashboards/{dashboardId}', {
params: { path: { name: 'logs', dashboardId: id } },
});
const response = await apiClient.fetch(
'PUT /api/streams/{name}/dashboards/{dashboardId} 2023-10-31',
{
params: { path: { name: 'logs', dashboardId: id } },
}
);
expect(response.status).to.be(200);
}
async function unlinkDashboard(id: string) {
const response = await apiClient.fetch('DELETE /api/streams/{name}/dashboards/{dashboardId}', {
params: { path: { name: 'logs', dashboardId: id } },
});
const response = await apiClient.fetch(
'DELETE /api/streams/{name}/dashboards/{dashboardId} 2023-10-31',
{
params: { path: { name: 'logs', dashboardId: id } },
}
);
expect(response.status).to.be(200);
}
async function bulkLinkDashboard(...ids: string[]) {
const response = await apiClient.fetch('POST /api/streams/{name}/dashboards/_bulk', {
const response = await apiClient.fetch('POST /api/streams/{name}/dashboards/_bulk 2023-10-31', {
params: {
path: { name: 'logs' },
body: {
@ -88,7 +94,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
}
async function bulkUnlinkDashboard(...ids: string[]) {
const response = await apiClient.fetch('POST /api/streams/{name}/dashboards/_bulk', {
const response = await apiClient.fetch('POST /api/streams/{name}/dashboards/_bulk 2023-10-31', {
params: {
path: { name: 'logs' },
body: {
@ -134,7 +140,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('lists the dashboard in the stream response', async () => {
const response = await apiClient.fetch('GET /api/streams/{name}', {
const response = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
params: { path: { name: 'logs' } },
});
@ -144,7 +150,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('lists the dashboard in the dashboards get response', async () => {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards', {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', {
params: { path: { name: 'logs' } },
});
@ -161,7 +167,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('dropped all dashboards', async () => {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards', {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', {
params: { path: { name: 'logs' } },
});
@ -174,7 +180,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await unlinkDashboard(SEARCH_DASHBOARD_ID);
await linkDashboard(SEARCH_DASHBOARD_ID);
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards', {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', {
params: { path: { name: 'logs' } },
});
@ -190,7 +196,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('no longer lists the dashboard as a linked asset', async () => {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards', {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', {
params: { path: { name: 'logs' } },
});
@ -214,7 +220,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('shows the linked dashboards', async () => {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards', {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', {
params: { path: { name: 'logs' } },
});
@ -227,7 +233,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('only shows the remaining linked dashboard', async () => {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards', {
const response = await apiClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', {
params: { path: { name: 'logs' } },
});
@ -253,7 +259,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
describe('after creating multiple dashboards', () => {
it('suggests dashboards to link', async () => {
const response = await apiClient.fetch(
'POST /api/streams/{name}/dashboards/_suggestions',
'POST /internal/streams/{name}/dashboards/_suggestions',
{
params: { path: { name: 'logs' }, body: { tags: [] }, query: { query: '' } },
}
@ -265,7 +271,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('filters suggested dashboards based on tags', async () => {
const response = await apiClient.fetch(
'POST /api/streams/{name}/dashboards/_suggestions',
'POST /internal/streams/{name}/dashboards/_suggestions',
{
params: { path: { name: 'logs' }, body: { tags: [TAG_ID] }, query: { query: '' } },
}
@ -277,7 +283,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('filters suggested dashboards based on the query', async () => {
const response = await apiClient.fetch(
'POST /api/streams/{name}/dashboards/_suggestions',
'POST /internal/streams/{name}/dashboards/_suggestions',
{
params: {
path: { name: 'logs' },

View file

@ -39,7 +39,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const {
body: { streams },
status,
} = await apiClient.fetch('GET /api/streams');
} = await apiClient.fetch('GET /api/streams 2023-10-31');
expect(status).to.eql(200);
@ -56,7 +56,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('Allows setting processing on classic streams', async () => {
const putResponse = await apiClient.fetch('PUT /api/streams/{name}', {
const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: {
name: TEST_STREAM_NAME,
@ -88,7 +88,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(putResponse.body).to.have.property('acknowledged', true);
const getResponse = await apiClient.fetch('GET /api/streams/{name}', {
const getResponse = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
params: { path: { name: TEST_STREAM_NAME } },
});
@ -176,7 +176,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('Allows removing processing on classic streams', async () => {
const response = await apiClient.fetch('PUT /api/streams/{name}', {
const response = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: { name: TEST_STREAM_NAME },
body: {
@ -214,7 +214,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('Allows deleting classic streams', async () => {
const deleteStreamResponse = await apiClient.fetch('DELETE /api/streams/{name}', {
const deleteStreamResponse = await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
params: {
path: {
name: TEST_STREAM_NAME,
@ -224,7 +224,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(deleteStreamResponse.status).to.eql(200);
const getStreamsResponse = await apiClient.fetch('GET /api/streams');
const getStreamsResponse = await apiClient.fetch('GET /api/streams 2023-10-31');
expect(getStreamsResponse.status).to.eql(200);
@ -270,7 +270,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('Allows adding processing to classic streams without pipeline', async () => {
const putResponse = await apiClient.fetch('PUT /api/streams/{name}', {
const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: {
name: DATA_STREAM_NAME,

View file

@ -44,21 +44,21 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
if (isGroupStreamDefinitionBase(stream) || isUnwiredStreamDefinition(stream)) return [];
return stream.ingest.wired.routing.map((r) => r.destination);
}
const logs = await apiClient.fetch('GET /api/streams/{name}', {
const logs = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'logs' },
},
});
expect(getChildNames(logs.body.stream)).to.contain('logs.deeply');
const logsDeeply = await apiClient.fetch('GET /api/streams/{name}', {
const logsDeeply = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'logs.deeply' },
},
});
expect(getChildNames(logsDeeply.body.stream)).to.contain('logs.deeply.nested');
const logsDeeplyNested = await apiClient.fetch('GET /api/streams/{name}', {
const logsDeeplyNested = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'logs.deeply.nested' },
},
@ -66,11 +66,14 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(getChildNames(logsDeeplyNested.body.stream)).to.contain(
'logs.deeply.nested.streamname'
);
const logsDeeplyNestedStreamname = await apiClient.fetch('GET /api/streams/{name}', {
params: {
path: { name: 'logs.deeply.nested.streamname' },
},
});
const logsDeeplyNestedStreamname = await apiClient.fetch(
'GET /api/streams/{name} 2023-10-31',
{
params: {
path: { name: 'logs.deeply.nested.streamname' },
},
}
);
expect(
(logsDeeplyNestedStreamname.body as WiredStreamGetResponse).stream.ingest.wired.fields
).to.eql({

View file

@ -97,7 +97,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('returns a 404 for logs', async () => {
await apiClient
.fetch('GET /api/streams/{name}', {
.fetch('GET /api/streams/{name} 2023-10-31', {
params: {
path: {
name: 'logs',

View file

@ -36,7 +36,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('successfully creates a GroupStream', async () => {
await apiClient
.fetch('PUT /api/streams/{name}', {
.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'test-group' },
body: {
@ -55,7 +55,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('successfully creates a second GroupStream', async () => {
await apiClient
.fetch('PUT /api/streams/{name}', {
.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'test-group-too' },
body: {
@ -74,7 +74,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('unsuccessfully updates a GroupStream with an uknown stream', async () => {
await apiClient
.fetch('PUT /api/streams/{name}', {
.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'test-group' },
body: {
@ -92,7 +92,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('unsuccessfully updates a GroupStream with an itself as a member', async () => {
await apiClient
.fetch('PUT /api/streams/{name}', {
.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'test-group' },
body: {
@ -110,7 +110,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('unsuccessfully updates a GroupStream with a forbidden member', async () => {
await apiClient
.fetch('PUT /api/streams/{name}', {
.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'test-group' },
body: {
@ -128,7 +128,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('successfully deletes a GroupStream', async () => {
await apiClient
.fetch('DELETE /api/streams/{name}', {
.fetch('DELETE /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'test-group-too' },
},
@ -138,7 +138,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('successfully reads a GroupStream', async () => {
const response = await apiClient
.fetch('GET /api/streams/{name}', {
.fetch('GET /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'test-group' },
},
@ -157,7 +157,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('successfully upserts a GroupStream from _group', async () => {
const response = await apiClient
.fetch('PUT /api/streams/{name}/_group', {
.fetch('PUT /api/streams/{name}/_group 2023-10-31', {
params: {
path: { name: 'test-group-3' },
body: {
@ -176,7 +176,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('successfully reads a GroupStream from _group', async () => {
const response = await apiClient
.fetch('GET /api/streams/{name}/_group', {
.fetch('GET /api/streams/{name}/_group 2023-10-31', {
params: {
path: { name: 'test-group-3' },
},
@ -190,7 +190,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('successfully lists a GroupStream', async () => {
const response = await apiClient.fetch('GET /api/streams').expect(200);
const response = await apiClient.fetch('GET /api/streams 2023-10-31').expect(200);
expect(response.body.streams.some((stream) => stream.name === 'test-group')).to.eql(true);
expect(response.body.streams.some((stream) => stream.name === 'test-group-3')).to.eql(true);
});
@ -198,7 +198,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('unsuccessfully creates a group stream with the same name as a unwired stream', async () => {
await esClient.index({ index: 'metrics-test-test', document: { '@timestamp': '2025' } });
await apiClient
.fetch('PUT /api/streams/{name}', {
.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'metrics-test-test' },
body: {
@ -216,7 +216,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
it('unsuccessfully creates a group stream prefixed with logs', async () => {
await apiClient
.fetch('PUT /api/streams/{name}', {
.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: { name: 'logs.group' },
body: {

View file

@ -130,7 +130,7 @@ const streams: StreamPutItem[] = [
export async function createStreams(apiClient: StreamsSupertestRepositoryClient) {
for (const { name, ...stream } of streams) {
await apiClient
.fetch('PUT /api/streams/{name}', {
.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
body: {
...stream,

View file

@ -14,11 +14,11 @@ import { StreamsRouteRepository } from '@kbn/streams-plugin/server';
import { StreamsSupertestRepositoryClient } from './repository_client';
export async function enableStreams(client: StreamsSupertestRepositoryClient) {
await client.fetch('POST /api/streams/_enable').expect(200);
await client.fetch('POST /api/streams/_enable 2023-10-31').expect(200);
}
export async function disableStreams(client: StreamsSupertestRepositoryClient) {
await client.fetch('POST /api/streams/_disable').expect(200);
await client.fetch('POST /api/streams/_disable 2023-10-31').expect(200);
}
export async function indexDocument(esClient: Client, index: string, document: JsonObject) {
@ -51,11 +51,11 @@ export async function forkStream(
root: string,
body: ClientRequestParamsOf<
StreamsRouteRepository,
'POST /api/streams/{name}/_fork'
'POST /api/streams/{name}/_fork 2023-10-31'
>['params']['body']
) {
return client
.fetch(`POST /api/streams/{name}/_fork`, {
.fetch(`POST /api/streams/{name}/_fork 2023-10-31`, {
params: {
path: {
name: root,
@ -74,7 +74,7 @@ export async function putStream(
expectStatusCode: number = 200
) {
return await apiClient
.fetch('PUT /api/streams/{name}', {
.fetch('PUT /api/streams/{name} 2023-10-31', {
params: {
path: {
name,
@ -92,7 +92,7 @@ export async function getStream(
expectStatusCode: number = 200
) {
return await apiClient
.fetch('GET /api/streams/{name}', {
.fetch('GET /api/streams/{name} 2023-10-31', {
params: {
path: {
name,
@ -109,7 +109,7 @@ export async function getIlmStats(
expectStatusCode: number = 200
) {
return await apiClient
.fetch('GET /api/streams/{name}/lifecycle/_stats', {
.fetch('GET /internal/streams/{name}/lifecycle/_stats', {
params: {
path: {
name,

View file

@ -23,12 +23,12 @@ async function simulateProcessingForStream(
name: string,
body: ClientRequestParamsOf<
StreamsRouteRepository,
'POST /api/streams/{name}/processing/_simulate'
'POST /internal/streams/{name}/processing/_simulate'
>['params']['body'],
statusCode = 200
) {
return client
.fetch('POST /api/streams/{name}/processing/_simulate', {
.fetch('POST /internal/streams/{name}/processing/_simulate', {
params: {
path: { name },
body,

View file

@ -43,7 +43,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
describe('Unmapped fields API', () => {
it('Returns unmapped fields', async () => {
const response = await apiClient
.fetch('GET /api/streams/{name}/schema/unmapped_fields', {
.fetch('GET /internal/streams/{name}/schema/unmapped_fields', {
params: {
path: {
name: 'logs',
@ -58,7 +58,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
describe('Fields simulation API', () => {
it('Returns failure status when simulation would fail', async () => {
const response = await apiClient.fetch(
'POST /api/streams/{name}/schema/fields_simulation',
'POST /internal/streams/{name}/schema/fields_simulation',
{
params: {
path: {
@ -77,7 +77,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});
it('Returns success status when simulation would succeed', async () => {
const response = await apiClient.fetch(
'POST /api/streams/{name}/schema/fields_simulation',
'POST /internal/streams/{name}/schema/fields_simulation',
{
params: {
path: {
@ -108,7 +108,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
await forkStream(apiClient, 'logs', forkBody);
const response = await apiClient.fetch(
'POST /api/streams/{name}/schema/fields_simulation',
'POST /internal/streams/{name}/schema/fields_simulation',
{
params: {
path: {