mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
🌊 Play nice with ES (#200253)
This PR implements two changes: * When syncing a stream, try to PUT the current mappings to the data stream - if this fails with `illegal_argument_exception`, do a rollover instead. This is similar to how fleet handles this situation * Before accessing streams, check whether the current user can read the current data stream and only return it if this is the case. Users with partial read access will only see a partial tree. This doesn't apply to writing changes as the user needs to be able to change index templates, pipelines and so on which requires admin privileges anyway --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
ef975b2b13
commit
8e67172861
3 changed files with 67 additions and 48 deletions
|
@ -6,6 +6,7 @@
|
|||
*/
|
||||
|
||||
import { ElasticsearchClient, Logger } from '@kbn/core/server';
|
||||
import { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types';
|
||||
import { retryTransientEsErrors } from '../helpers/retry';
|
||||
|
||||
interface DataStreamManagementOptions {
|
||||
|
@ -23,6 +24,7 @@ interface DeleteDataStreamOptions {
|
|||
interface RolloverDataStreamOptions {
|
||||
esClient: ElasticsearchClient;
|
||||
name: string;
|
||||
mappings: MappingTypeMapping['properties'] | undefined;
|
||||
logger: Logger;
|
||||
}
|
||||
|
||||
|
@ -56,38 +58,37 @@ export async function rolloverDataStreamIfNecessary({
|
|||
esClient,
|
||||
name,
|
||||
logger,
|
||||
mappings,
|
||||
}: RolloverDataStreamOptions) {
|
||||
const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` });
|
||||
for (const dataStream of dataStreams.data_streams) {
|
||||
const currentMappings =
|
||||
Object.values(
|
||||
await esClient.indices.getMapping({
|
||||
index: dataStream.indices.at(-1)?.index_name,
|
||||
})
|
||||
)[0].mappings.properties || {};
|
||||
const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name });
|
||||
const simulatedMappings = simulatedIndex.template.mappings.properties || {};
|
||||
|
||||
// check whether the same fields and same types are listed (don't check for other mapping attributes)
|
||||
const isDifferent =
|
||||
Object.values(simulatedMappings).length !== Object.values(currentMappings).length ||
|
||||
Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => {
|
||||
const currentType = currentMappings[fieldName]?.type;
|
||||
return currentType !== type;
|
||||
});
|
||||
|
||||
if (!isDifferent) {
|
||||
const writeIndex = dataStream.indices.at(-1);
|
||||
if (!writeIndex) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
|
||||
logger,
|
||||
});
|
||||
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
|
||||
await retryTransientEsErrors(
|
||||
() => esClient.indices.putMapping({ index: writeIndex.index_name, properties: mappings }),
|
||||
{
|
||||
logger,
|
||||
}
|
||||
);
|
||||
} catch (error: any) {
|
||||
logger.error(`Error rolling over data stream: ${error.message}`);
|
||||
throw error;
|
||||
if (
|
||||
typeof error.message !== 'string' ||
|
||||
!error.message.includes('illegal_argument_exception')
|
||||
) {
|
||||
throw error;
|
||||
}
|
||||
try {
|
||||
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
|
||||
logger,
|
||||
});
|
||||
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
|
||||
} catch (rolloverError: any) {
|
||||
logger.error(`Error rolling over data stream: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -112,6 +112,7 @@ export async function listStreams({
|
|||
|
||||
interface ReadStreamParams extends BaseParams {
|
||||
id: string;
|
||||
skipAccessCheck?: boolean;
|
||||
}
|
||||
|
||||
export interface ReadStreamResponse {
|
||||
|
@ -121,6 +122,7 @@ export interface ReadStreamResponse {
|
|||
export async function readStream({
|
||||
id,
|
||||
scopedClusterClient,
|
||||
skipAccessCheck,
|
||||
}: ReadStreamParams): Promise<ReadStreamResponse> {
|
||||
try {
|
||||
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
|
||||
|
@ -128,6 +130,12 @@ export async function readStream({
|
|||
index: STREAMS_INDEX,
|
||||
});
|
||||
const definition = response._source as StreamDefinition;
|
||||
if (!skipAccessCheck) {
|
||||
const hasAccess = await checkReadAccess({ id, scopedClusterClient });
|
||||
if (!hasAccess) {
|
||||
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
|
||||
}
|
||||
}
|
||||
return {
|
||||
definition,
|
||||
};
|
||||
|
@ -249,6 +257,21 @@ export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamP
|
|||
}
|
||||
}
|
||||
|
||||
interface CheckReadAccessParams extends BaseParams {
|
||||
id: string;
|
||||
}
|
||||
|
||||
export async function checkReadAccess({
|
||||
id,
|
||||
scopedClusterClient,
|
||||
}: CheckReadAccessParams): Promise<boolean> {
|
||||
try {
|
||||
return await scopedClusterClient.asCurrentUser.indices.exists({ index: id });
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
interface SyncStreamParams {
|
||||
scopedClusterClient: IScopedClusterClient;
|
||||
definition: StreamDefinition;
|
||||
|
@ -262,10 +285,11 @@ export async function syncStream({
|
|||
rootDefinition,
|
||||
logger,
|
||||
}: SyncStreamParams) {
|
||||
const componentTemplate = generateLayer(definition.id, definition);
|
||||
await upsertComponent({
|
||||
esClient: scopedClusterClient.asCurrentUser,
|
||||
logger,
|
||||
component: generateLayer(definition.id, definition),
|
||||
component: componentTemplate,
|
||||
});
|
||||
await upsertIngestPipeline({
|
||||
esClient: scopedClusterClient.asCurrentUser,
|
||||
|
@ -308,5 +332,6 @@ export async function syncStream({
|
|||
esClient: scopedClusterClient.asCurrentUser,
|
||||
name: definition.id,
|
||||
logger,
|
||||
mappings: componentTemplate.template.mappings?.properties,
|
||||
});
|
||||
}
|
||||
|
|
|
@ -52,32 +52,25 @@ export interface StreamTree {
|
|||
children: StreamTree[];
|
||||
}
|
||||
|
||||
function asTrees(definitions: StreamDefinition[]): StreamTree[] {
|
||||
const nodes = new Map<string, StreamTree>();
|
||||
function asTrees(definitions: Array<{ id: string }>) {
|
||||
const trees: StreamTree[] = [];
|
||||
const ids = definitions.map((definition) => definition.id);
|
||||
|
||||
const rootNodes = new Set<StreamTree>();
|
||||
ids.sort((a, b) => a.split('.').length - b.split('.').length);
|
||||
|
||||
function getNode(id: string) {
|
||||
let node = nodes.get(id);
|
||||
if (!node) {
|
||||
node = { id, children: [] };
|
||||
nodes.set(id, node);
|
||||
ids.forEach((id) => {
|
||||
let currentTree = trees;
|
||||
let existingNode: StreamTree | undefined;
|
||||
// traverse the tree following the prefix of the current id.
|
||||
// once we reach the leaf, the current id is added as child - this works because the ids are sorted by depth
|
||||
while ((existingNode = currentTree.find((node) => id.startsWith(node.id)))) {
|
||||
currentTree = existingNode.children;
|
||||
}
|
||||
return node;
|
||||
}
|
||||
|
||||
definitions.forEach((definition) => {
|
||||
const path = definition.id.split('.');
|
||||
const parentId = path.slice(0, path.length - 1).join('.');
|
||||
const parentNode = parentId.length ? getNode(parentId) : undefined;
|
||||
const selfNode = getNode(definition.id);
|
||||
|
||||
if (parentNode) {
|
||||
parentNode.children.push(selfNode);
|
||||
} else {
|
||||
rootNodes.add(selfNode);
|
||||
if (!existingNode) {
|
||||
const newNode = { id, children: [] };
|
||||
currentTree.push(newNode);
|
||||
}
|
||||
});
|
||||
|
||||
return Array.from(rootNodes.values());
|
||||
return trees;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue