[Streams] Refactoring streams routes (#206526)

## Summary

This PR consolidates the multiple `server/streams/*` route files into 4
`route.ts` files to optimize the Typescript parsing. I tried to organize
the routes into 4 logical groups:

- CRUD - edit, list, read, delete
- Management - fork, resync, status, sample
- Schema - unmapped fields, schema simulation
- Enablement - disable, enable

I left everything else "as is" since @dgieselaar is currently doing a
refactor to consolidate most of the features into a new `StreamsClient`
similar to the `AssetClient`
This commit is contained in:
Chris Cowan 2025-01-16 09:53:01 -07:00 committed by GitHub
parent 10a221553a
commit 24f5153d85
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 635 additions and 749 deletions

View file

@ -5,40 +5,22 @@
* 2.0.
*/
import { dashboardRoutes } from './dashboards/route';
import { esqlRoutes } from './esql/route';
import { deleteStreamRoute } from './streams/delete';
import { streamDetailRoute } from './streams/details';
import { disableStreamsRoute } from './streams/disable';
import { editStreamRoute } from './streams/edit';
import { enableStreamsRoute } from './streams/enable';
import { forkStreamsRoute } from './streams/fork';
import { listStreamsRoute } from './streams/list';
import { readStreamRoute } from './streams/read';
import { resyncStreamsRoute } from './streams/resync';
import { sampleStreamRoute } from './streams/sample';
import { schemaFieldsSimulationRoute } from './streams/schema/fields_simulation';
import { unmappedFieldsRoute } from './streams/schema/unmapped_fields';
import { simulateProcessorRoute } from './streams/processing/simulate';
import { streamsStatusRoutes } from './streams/settings';
import { dashboardRoutes } from './dashboards/route';
import { crudRoutes } from './streams/crud/route';
import { enablementRoutes } from './streams/enablement/route';
import { managementRoutes } from './streams/management/route';
import { schemaRoutes } from './streams/schema/route';
import { processingRoutes } from './streams/processing/route';
export const streamsRouteRepository = {
...enableStreamsRoute,
...resyncStreamsRoute,
...forkStreamsRoute,
...readStreamRoute,
...editStreamRoute,
...deleteStreamRoute,
...listStreamsRoute,
...streamsStatusRoutes,
...esqlRoutes,
...disableStreamsRoute,
...dashboardRoutes,
...sampleStreamRoute,
...streamDetailRoute,
...unmappedFieldsRoute,
...simulateProcessorRoute,
...schemaFieldsSimulationRoute,
...crudRoutes,
...enablementRoutes,
...managementRoutes,
...schemaRoutes,
...processingRoutes,
};
export type StreamsRouteRepository = typeof streamsRouteRepository;

View file

@ -0,0 +1,286 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { badRequest, internal, notFound } from '@hapi/boom';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import {
streamConfigDefinitionSchema,
ListStreamsResponse,
FieldDefinitionConfig,
ReadStreamDefinition,
WiredReadStreamDefinition,
isWiredStream,
} from '@kbn/streams-schema';
import { isResponseError } from '@kbn/es-errors';
import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
RootStreamImmutabilityException,
SecurityException,
} from '../../../lib/streams/errors';
import { createServerRoute } from '../../create_server_route';
import { getDataStreamLifecycle } from '../../../lib/streams/stream_crud';
export const readStreamRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({ params, request, getScopedClients }): Promise<ReadStreamDefinition> => {
try {
const { assetClient, streamsClient } = await getScopedClients({
request,
});
const name = params.path.id;
const [streamDefinition, dashboards, ancestors, dataStream] = await Promise.all([
streamsClient.getStream(name),
assetClient.getAssetIds({
entityId: name,
entityType: 'stream',
assetType: 'dashboard',
}),
streamsClient.getAncestors(name),
streamsClient.getDataStream(name),
]);
const lifecycle = getDataStreamLifecycle(dataStream);
if (!isWiredStream(streamDefinition)) {
return {
...streamDefinition,
lifecycle,
dashboards,
inherited_fields: {},
};
}
const body: WiredReadStreamDefinition = {
...streamDefinition,
dashboards,
lifecycle,
inherited_fields: ancestors.reduce((acc, def) => {
Object.entries(def.stream.ingest.wired.fields).forEach(([key, fieldDef]) => {
acc[key] = { ...fieldDef, from: def.name };
});
return acc;
// TODO: replace this with a proper type
}, {} as Record<string, FieldDefinitionConfig & { from: string }>),
};
return body;
} catch (e) {
if (e instanceof DefinitionNotFound || (isResponseError(e) && e.statusCode === 404)) {
throw notFound(e);
}
throw internal(e);
}
},
});
export interface StreamDetailsResponse {
details: {
count: number;
};
}
export const streamDetailRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}/_details',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
query: z.object({
start: z.string(),
end: z.string(),
}),
}),
handler: async ({ params, request, getScopedClients }): Promise<StreamDetailsResponse> => {
try {
const { scopedClusterClient, streamsClient } = await getScopedClients({ request });
const streamEntity = await streamsClient.getStream(params.path.id);
// check doc count
const docCountResponse = await scopedClusterClient.asCurrentUser.search({
index: streamEntity.name,
body: {
track_total_hits: true,
query: {
range: {
'@timestamp': {
gte: params.query.start,
lte: params.query.end,
},
},
},
size: 0,
},
});
const count = (docCountResponse.hits.total as SearchTotalHits).value;
return {
details: {
count,
},
};
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}
throw internal(e);
}
},
});
export const listStreamsRoute = createServerRoute({
endpoint: 'GET /api/streams',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({}),
handler: async ({ request, getScopedClients }): Promise<ListStreamsResponse> => {
try {
const { streamsClient } = await getScopedClients({ request });
return {
streams: await streamsClient.listStreams(),
};
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}
throw internal(e);
}
},
});
export const editStreamRoute = createServerRoute({
endpoint: 'PUT /api/streams/{id}',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
body: streamConfigDefinitionSchema,
}),
handler: async ({ params, request, getScopedClients }) => {
try {
const { streamsClient } = await getScopedClients({ request });
const streamDefinition = { stream: params.body, name: params.path.id };
return await streamsClient.upsertStream({ definition: streamDefinition });
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
throw notFound(e);
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId ||
e instanceof RootStreamImmutabilityException
) {
throw badRequest(e);
}
throw internal(e);
}
},
});
export const deleteStreamRoute = createServerRoute({
endpoint: 'DELETE /api/streams/{id}',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
}),
handler: async ({ params, request, getScopedClients }): Promise<{ acknowledged: true }> => {
try {
const { streamsClient } = await getScopedClients({
request,
});
await streamsClient.deleteStream(params.path.id);
return { acknowledged: true };
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
throw notFound(e);
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId
) {
throw badRequest(e);
}
throw internal(e);
}
},
});
export const crudRoutes = {
...readStreamRoute,
...streamDetailRoute,
...listStreamsRoute,
...editStreamRoute,
...deleteStreamRoute,
};

View file

@ -1,66 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { badRequest, internal, notFound } from '@hapi/boom';
import { z } from '@kbn/zod';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
SecurityException,
} from '../../lib/streams/errors';
import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id';
import { createServerRoute } from '../create_server_route';
export const deleteStreamRoute = createServerRoute({
endpoint: 'DELETE /api/streams/{id}',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
}),
handler: async ({
params,
logger,
request,
getScopedClients,
}): Promise<{ acknowledged: true }> => {
try {
const { streamsClient } = await getScopedClients({
request,
});
await streamsClient.deleteStream(params.path.id);
return { acknowledged: true };
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
throw notFound(e);
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId
) {
throw badRequest(e);
}
throw internal(e);
}
},
});

View file

@ -1,82 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import { createServerRoute } from '../create_server_route';
import { DefinitionNotFound } from '../../lib/streams/errors';
export interface StreamDetailsResponse {
details: {
count: number;
};
}
export const streamDetailRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}/_details',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
query: z.object({
start: z.string(),
end: z.string(),
}),
}),
handler: async ({
response,
params,
request,
logger,
getScopedClients,
}): Promise<StreamDetailsResponse> => {
try {
const { scopedClusterClient, streamsClient } = await getScopedClients({ request });
const streamEntity = await streamsClient.getStream(params.path.id);
// check doc count
const docCountResponse = await scopedClusterClient.asCurrentUser.search({
index: streamEntity.name,
body: {
track_total_hits: true,
query: {
range: {
'@timestamp': {
gte: params.query.start,
lte: params.query.end,
},
},
},
size: 0,
},
});
const count = (docCountResponse.hits.total as SearchTotalHits).value;
return {
details: {
count,
},
};
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}
throw internal(e);
}
},
});

View file

@ -1,37 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { badRequest, internal } from '@hapi/boom';
import { z } from '@kbn/zod';
import { DisableStreamsResponse } from '../../lib/streams/client';
import { SecurityException } from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
export const disableStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_disable',
params: z.object({}),
options: {
access: 'internal',
},
security: {
authz: {
requiredPrivileges: ['streams_write'],
},
},
handler: async ({ request, getScopedClients }): Promise<DisableStreamsResponse> => {
try {
const { streamsClient } = await getScopedClients({ request });
return await streamsClient.disableStreams();
} catch (e) {
if (e instanceof SecurityException) {
throw badRequest(e);
}
throw internal(e);
}
},
});

View file

@ -1,62 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { badRequest, internal, notFound } from '@hapi/boom';
import { streamConfigDefinitionSchema } from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
RootStreamImmutabilityException,
SecurityException,
} from '../../lib/streams/errors';
import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id';
import { createServerRoute } from '../create_server_route';
export const editStreamRoute = createServerRoute({
endpoint: 'PUT /api/streams/{id}',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
body: streamConfigDefinitionSchema,
}),
handler: async ({ params, request, getScopedClients }) => {
try {
const { streamsClient } = await getScopedClients({ request });
const streamDefinition = { stream: params.body, name: params.path.id };
return await streamsClient.upsertStream({ definition: streamDefinition });
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
throw notFound(e);
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId ||
e instanceof RootStreamImmutabilityException
) {
throw badRequest(e);
}
throw internal(e);
}
},
});

View file

@ -7,9 +7,9 @@
import { badRequest, internal } from '@hapi/boom';
import { z } from '@kbn/zod';
import { EnableStreamsResponse } from '../../lib/streams/client';
import { SecurityException } from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
import { SecurityException } from '../../../lib/streams/errors';
import { createServerRoute } from '../../create_server_route';
import { DisableStreamsResponse, EnableStreamsResponse } from '../../../lib/streams/client';
export const enableStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_enable',
@ -39,3 +39,33 @@ export const enableStreamsRoute = createServerRoute({
}
},
});
export const disableStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_disable',
params: z.object({}),
options: {
access: 'internal',
},
security: {
authz: {
requiredPrivileges: ['streams_write'],
},
},
handler: async ({ request, getScopedClients }): Promise<DisableStreamsResponse> => {
try {
const { streamsClient } = await getScopedClients({ request });
return await streamsClient.disableStreams();
} catch (e) {
if (e instanceof SecurityException) {
throw badRequest(e);
}
throw internal(e);
}
},
});
export const enablementRoutes = {
...enableStreamsRoute,
...disableStreamsRoute,
};

View file

@ -1,79 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { badRequest, internal, notFound } from '@hapi/boom';
import { conditionSchema } from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
RootStreamImmutabilityException,
SecurityException,
} from '../../lib/streams/errors';
import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id';
import { validateCondition } from '../../lib/streams/helpers/condition_fields';
import { createServerRoute } from '../create_server_route';
export const forkStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/{id}/_fork',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
body: z.object({ stream: z.object({ name: z.string() }), condition: conditionSchema }),
}),
handler: async ({
params,
logger,
request,
getScopedClients,
}): Promise<{ acknowledged: true }> => {
try {
if (!params.body.condition) {
throw new ForkConditionMissing('You must provide a condition to fork a stream');
}
validateCondition(params.body.condition);
const { streamsClient } = await getScopedClients({
request,
});
return await streamsClient.forkStream({
parent: params.path.id,
condition: params.body.condition,
name: params.body.stream.name,
});
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
throw notFound(e);
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId ||
e instanceof RootStreamImmutabilityException
) {
throw badRequest(e);
}
throw internal(e);
}
},
});

View file

@ -1,41 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import { ListStreamsResponse } from '@kbn/streams-schema';
import { createServerRoute } from '../create_server_route';
import { DefinitionNotFound } from '../../lib/streams/errors';
export const listStreamsRoute = createServerRoute({
endpoint: 'GET /api/streams',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({}),
handler: async ({ request, getScopedClients }): Promise<ListStreamsResponse> => {
try {
const { streamsClient } = await getScopedClients({ request });
return {
streams: await streamsClient.listStreams(),
};
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}
throw internal(e);
}
},
});

View file

@ -0,0 +1,213 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { badRequest, internal, notFound } from '@hapi/boom';
import { conditionSchema, isCompleteCondition } from '@kbn/streams-schema';
import { errors } from '@elastic/elasticsearch';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
RootStreamImmutabilityException,
SecurityException,
} from '../../../lib/streams/errors';
import { createServerRoute } from '../../create_server_route';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { MalformedStreamId } from '../../../lib/streams/errors/malformed_stream_id';
import { validateCondition } from '../../../lib/streams/helpers/condition_fields';
import { conditionToQueryDsl } from '../../../lib/streams/helpers/condition_to_query_dsl';
import { getFields } from '../../../lib/streams/helpers/condition_fields';
import { ResyncStreamsResponse } from '../../../lib/streams/client';
export const forkStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/{id}/_fork',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
body: z.object({ stream: z.object({ name: z.string() }), condition: conditionSchema }),
}),
handler: async ({ params, request, getScopedClients }): Promise<{ acknowledged: true }> => {
try {
if (!params.body.condition) {
throw new ForkConditionMissing('You must provide a condition to fork a stream');
}
validateCondition(params.body.condition);
const { streamsClient } = await getScopedClients({
request,
});
return await streamsClient.forkStream({
parent: params.path.id,
condition: params.body.condition,
name: params.body.stream.name,
});
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
throw notFound(e);
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId ||
e instanceof RootStreamImmutabilityException
) {
throw badRequest(e);
}
throw internal(e);
}
},
});
export const resyncStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_resync',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({}),
handler: async ({ request, getScopedClients }): Promise<ResyncStreamsResponse> => {
const { streamsClient } = await getScopedClients({ request });
return await streamsClient.resyncStreams();
},
});
export const getStreamsStatusRoute = createServerRoute({
endpoint: 'GET /api/streams/_status',
options: {
access: 'internal',
},
security: {
authz: {
requiredPrivileges: ['streams_read'],
},
},
handler: async ({ request, getScopedClients }): Promise<{ enabled: boolean }> => {
const { streamsClient } = await getScopedClients({ request });
return {
enabled: await streamsClient.isStreamsEnabled(),
};
},
});
export const sampleStreamRoute = createServerRoute({
endpoint: 'POST /api/streams/{id}/_sample',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
body: z.object({
condition: z.optional(conditionSchema),
start: z.optional(z.number()),
end: z.optional(z.number()),
number: z.optional(z.number()),
}),
}),
handler: async ({ params, request, getScopedClients }): Promise<{ documents: unknown[] }> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
const { read } = await checkAccess({ id: params.path.id, scopedClusterClient });
if (!read) {
throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`);
}
const searchBody = {
query: {
bool: {
must: [
isCompleteCondition(params.body.condition)
? conditionToQueryDsl(params.body.condition)
: { match_all: {} },
{
range: {
'@timestamp': {
gte: params.body.start,
lte: params.body.end,
format: 'epoch_millis',
},
},
},
],
},
},
// Conditions could be using fields which are not indexed or they could use it with other types than they are eventually mapped as.
// Because of this we can't rely on mapped fields to draw a sample, instead we need to use runtime fields to simulate what happens during
// ingest in the painless condition checks.
// This is less efficient than it could be - in some cases, these fields _are_ indexed with the right type and we could use them directly.
// This can be optimized in the future.
runtime_mappings: Object.fromEntries(
getFields(params.body.condition).map((field) => [
field.name,
{ type: field.type === 'string' ? 'keyword' : 'double' },
])
),
sort: [
{
'@timestamp': {
order: 'desc',
},
},
],
size: params.body.number,
};
const results = await scopedClusterClient.asCurrentUser.search({
index: params.path.id,
allow_no_indices: true,
...searchBody,
});
return { documents: results.hits.hits.map((hit) => hit._source) };
} catch (error) {
if (error instanceof errors.ResponseError && error.meta.statusCode === 404) {
throw notFound(error);
}
if (error instanceof DefinitionNotFound) {
throw notFound(error);
}
throw internal(error);
}
},
});
export const managementRoutes = {
...forkStreamsRoute,
...resyncStreamsRoute,
...getStreamsStatusRoute,
...sampleStreamRoute,
};

View file

@ -174,3 +174,7 @@ const isSuccessfulDocument = (
): doc is Required<IngestSimulateSimulateDocumentResult> =>
doc.processor_results?.every((processorSimulation) => processorSimulation.status === 'success') ||
false;
export const processingRoutes = {
...simulateProcessorRoute,
};

View file

@ -1,88 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { internal, notFound } from '@hapi/boom';
import {
FieldDefinitionConfig,
ReadStreamDefinition,
WiredReadStreamDefinition,
isWiredStream,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { isResponseError } from '@kbn/es-errors';
import { DefinitionNotFound } from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
import { getDataStreamLifecycle } from '../../lib/streams/stream_crud';
export const readStreamRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({ params, request, getScopedClients }): Promise<ReadStreamDefinition> => {
try {
const { assetClient, streamsClient } = await getScopedClients({
request,
});
const name = params.path.id;
const [streamDefinition, dashboards, ancestors, dataStream] = await Promise.all([
streamsClient.getStream(name),
assetClient.getAssetIds({
entityId: name,
entityType: 'stream',
assetType: 'dashboard',
}),
streamsClient.getAncestors(name),
streamsClient.getDataStream(name),
]);
const lifecycle = getDataStreamLifecycle(dataStream);
if (!isWiredStream(streamDefinition)) {
return {
...streamDefinition,
lifecycle,
dashboards,
inherited_fields: {},
};
}
const body: WiredReadStreamDefinition = {
...streamDefinition,
dashboards,
lifecycle,
inherited_fields: ancestors.reduce((acc, def) => {
Object.entries(def.stream.ingest.wired.fields).forEach(([key, fieldDef]) => {
acc[key] = { ...fieldDef, from: def.name };
});
return acc;
// TODO: replace this with a proper type
}, {} as Record<string, FieldDefinitionConfig & { from: string }>),
};
return body;
} catch (e) {
if (e instanceof DefinitionNotFound || (isResponseError(e) && e.statusCode === 404)) {
throw notFound(e);
}
throw internal(e);
}
},
});

View file

@ -1,30 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { ResyncStreamsResponse } from '../../lib/streams/client';
import { createServerRoute } from '../create_server_route';
export const resyncStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_resync',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({}),
handler: async ({ request, getScopedClients }): Promise<ResyncStreamsResponse> => {
const { streamsClient } = await getScopedClients({ request });
return await streamsClient.resyncStreams();
},
});

View file

@ -1,104 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import { errors } from '@elastic/elasticsearch';
import { conditionSchema, isCompleteCondition } from '@kbn/streams-schema';
import { createServerRoute } from '../create_server_route';
import { DefinitionNotFound } from '../../lib/streams/errors';
import { checkAccess } from '../../lib/streams/stream_crud';
import { conditionToQueryDsl } from '../../lib/streams/helpers/condition_to_query_dsl';
import { getFields } from '../../lib/streams/helpers/condition_fields';
export const sampleStreamRoute = createServerRoute({
endpoint: 'POST /api/streams/{id}/_sample',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
body: z.object({
condition: z.optional(conditionSchema),
start: z.optional(z.number()),
end: z.optional(z.number()),
number: z.optional(z.number()),
}),
}),
handler: async ({ params, request, getScopedClients }): Promise<{ documents: unknown[] }> => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
const { read } = await checkAccess({ id: params.path.id, scopedClusterClient });
if (!read) {
throw new DefinitionNotFound(`Stream definition for ${params.path.id} not found.`);
}
const searchBody = {
query: {
bool: {
must: [
isCompleteCondition(params.body.condition)
? conditionToQueryDsl(params.body.condition)
: { match_all: {} },
{
range: {
'@timestamp': {
gte: params.body.start,
lte: params.body.end,
format: 'epoch_millis',
},
},
},
],
},
},
// Conditions could be using fields which are not indexed or they could use it with other types than they are eventually mapped as.
// Because of this we can't rely on mapped fields to draw a sample, instead we need to use runtime fields to simulate what happens during
// ingest in the painless condition checks.
// This is less efficient than it could be - in some cases, these fields _are_ indexed with the right type and we could use them directly.
// This can be optimized in the future.
runtime_mappings: Object.fromEntries(
getFields(params.body.condition).map((field) => [
field.name,
{ type: field.type === 'string' ? 'keyword' : 'double' },
])
),
sort: [
{
'@timestamp': {
order: 'desc',
},
},
],
size: params.body.number,
};
const results = await scopedClusterClient.asCurrentUser.search({
index: params.path.id,
allow_no_indices: true,
...searchBody,
});
return { documents: results.hits.hits.map((hit) => hit._source) };
} catch (error) {
if (error instanceof errors.ResponseError && error.meta.statusCode === 404) {
throw notFound(error);
}
if (error instanceof DefinitionNotFound) {
throw notFound(error);
}
throw internal(error);
}
},
});

View file

@ -4,16 +4,92 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { notFound, internal } from '@hapi/boom';
import { internal, notFound } from '@hapi/boom';
import { getFlattenedObject } from '@kbn/std';
import { fieldDefinitionConfigSchema } from '@kbn/streams-schema';
import { createServerRoute } from '../../create_server_route';
import { fieldDefinitionConfigSchema, isWiredStream } from '@kbn/streams-schema';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { checkAccess } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
const SAMPLE_SIZE = 200;
const UNMAPPED_SAMPLE_SIZE = 500;
export const unmappedFieldsRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}/schema/unmapped_fields',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({ params, request, getScopedClients }): Promise<{ unmappedFields: string[] }> => {
try {
const { scopedClusterClient, streamsClient } = await getScopedClients({ request });
const searchBody = {
sort: [
{
'@timestamp': {
order: 'desc',
},
},
],
size: UNMAPPED_SAMPLE_SIZE,
};
const [streamDefinition, ancestors, results] = await Promise.all([
streamsClient.getStream(params.path.id),
streamsClient.getAncestors(params.path.id),
scopedClusterClient.asCurrentUser.search({
index: params.path.id,
...searchBody,
}),
]);
const sourceFields = new Set<string>();
results.hits.hits.forEach((hit) => {
Object.keys(getFlattenedObject(hit._source as Record<string, unknown>)).forEach((field) => {
sourceFields.add(field);
});
});
// Mapped fields from the stream's definition and inherited from ancestors
const mappedFields = new Set<string>();
if (isWiredStream(streamDefinition)) {
Object.keys(streamDefinition.stream.ingest.wired.fields).forEach((name) =>
mappedFields.add(name)
);
}
for (const ancestor of ancestors) {
Object.keys(ancestor.stream.ingest.wired.fields).forEach((name) => mappedFields.add(name));
}
const unmappedFields = Array.from(sourceFields)
.filter((field) => !mappedFields.has(field))
.sort();
return { unmappedFields };
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}
throw internal(e);
}
},
});
const FIELD_SIMILATION_SAMPLE_SIZE = 200;
export const schemaFieldsSimulationRoute = createServerRoute({
endpoint: 'POST /api/streams/{id}/schema/fields_simulation',
@ -71,7 +147,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({
},
},
],
size: SAMPLE_SIZE,
size: FIELD_SIMILATION_SAMPLE_SIZE,
};
const sampleResults = await scopedClusterClient.asCurrentUser.search({
@ -168,7 +244,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({
},
},
],
size: SAMPLE_SIZE,
size: FIELD_SIMILATION_SAMPLE_SIZE,
fields: params.body.field_definitions.map((field) => field.name),
_source: false,
};
@ -203,3 +279,8 @@ export const schemaFieldsSimulationRoute = createServerRoute({
}
},
});
export const schemaRoutes = {
...unmappedFieldsRoute,
...schemaFieldsSimulationRoute,
};

View file

@ -1,90 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { internal, notFound } from '@hapi/boom';
import { getFlattenedObject } from '@kbn/std';
import { isWiredStream } from '@kbn/streams-schema';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { createServerRoute } from '../../create_server_route';
const SAMPLE_SIZE = 500;
export const unmappedFieldsRoute = createServerRoute({
endpoint: 'GET /api/streams/{id}/schema/unmapped_fields',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({ params, request, getScopedClients }): Promise<{ unmappedFields: string[] }> => {
try {
const { scopedClusterClient, streamsClient } = await getScopedClients({ request });
const searchBody = {
sort: [
{
'@timestamp': {
order: 'desc',
},
},
],
size: SAMPLE_SIZE,
};
const [streamDefinition, ancestors, results] = await Promise.all([
streamsClient.getStream(params.path.id),
streamsClient.getAncestors(params.path.id),
scopedClusterClient.asCurrentUser.search({
index: params.path.id,
...searchBody,
}),
]);
const sourceFields = new Set<string>();
results.hits.hits.forEach((hit) => {
Object.keys(getFlattenedObject(hit._source as Record<string, unknown>)).forEach((field) => {
sourceFields.add(field);
});
});
// Mapped fields from the stream's definition and inherited from ancestors
const mappedFields = new Set<string>();
if (isWiredStream(streamDefinition)) {
Object.keys(streamDefinition.stream.ingest.wired.fields).forEach((name) =>
mappedFields.add(name)
);
}
for (const ancestor of ancestors) {
Object.keys(ancestor.stream.ingest.wired.fields).forEach((name) => mappedFields.add(name));
}
const unmappedFields = Array.from(sourceFields)
.filter((field) => !mappedFields.has(field))
.sort();
return { unmappedFields };
} catch (e) {
if (e instanceof DefinitionNotFound) {
throw notFound(e);
}
throw internal(e);
}
},
});

View file

@ -1,31 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { createServerRoute } from '../create_server_route';
export const getStreamsStatusRoute = createServerRoute({
endpoint: 'GET /api/streams/_status',
options: {
access: 'internal',
},
security: {
authz: {
requiredPrivileges: ['streams_read'],
},
},
handler: async ({ request, getScopedClients }): Promise<{ enabled: boolean }> => {
const { streamsClient } = await getScopedClients({ request });
return {
enabled: await streamsClient.isStreamsEnabled(),
};
},
});
export const streamsStatusRoutes = {
...getStreamsStatusRoute,
};