migrations incorrectly detects cluster routing allocation setting as incompatible (#131712)

* Add reproducing test case

* Fix and add integration test

* Transient settings should take preference

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Rudolf Meijering 2022-05-09 21:03:50 +02:00 committed by GitHub
parent 1f9047d214
commit 638bfbee3d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 220 additions and 120 deletions

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
import * as Either from 'fp-ts/lib/Either';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { errors as EsErrors } from '@elastic/elasticsearch';
jest.mock('./catch_retryable_es_client_errors');
@ -16,16 +17,16 @@ describe('initAction', () => {
beforeEach(() => {
jest.clearAllMocks();
});
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
const task = initAction({ client, indices: ['my_index'] });
try {
await task();
@ -34,4 +35,88 @@ describe('initAction', () => {
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
it('resolves right when persistent and transient cluster settings are compatible', async () => {
const clusterSettingsResponse = {
transient: { 'cluster.routing.allocation.enable': 'all' },
persistent: { 'cluster.routing.allocation.enable': 'all' },
};
const client = elasticsearchClientMock.createInternalClient(
new Promise((res) => res(clusterSettingsResponse))
);
const task = initAction({ client, indices: ['my_index'] });
const result = await task();
expect(Either.isRight(result)).toEqual(true);
});
it('resolves right when persistent and transient cluster settings are undefined', async () => {
const clusterSettingsResponse = {
transient: {},
persistent: {},
};
const client = elasticsearchClientMock.createInternalClient(
new Promise((res) => res(clusterSettingsResponse))
);
const task = initAction({ client, indices: ['my_index'] });
const result = await task();
expect(Either.isRight(result)).toEqual(true);
});
it('resolves right when persistent cluster settings are compatible', async () => {
const clusterSettingsResponse = {
transient: {},
persistent: { 'cluster.routing.allocation.enable': 'all' },
};
const client = elasticsearchClientMock.createInternalClient(
new Promise((res) => res(clusterSettingsResponse))
);
const task = initAction({ client, indices: ['my_index'] });
const result = await task();
expect(Either.isRight(result)).toEqual(true);
});
it('resolves right when transient cluster settings are compatible', async () => {
const clusterSettingsResponse = {
transient: { 'cluster.routing.allocation.enable': 'all' },
persistent: {},
};
const client = elasticsearchClientMock.createInternalClient(
new Promise((res) => res(clusterSettingsResponse))
);
const task = initAction({ client, indices: ['my_index'] });
const result = await task();
expect(Either.isRight(result)).toEqual(true);
});
it('resolves right when valid transient settings, incompatible persistent settings', async () => {
const clusterSettingsResponse = {
transient: { 'cluster.routing.allocation.enable': 'all' },
persistent: { 'cluster.routing.allocation.enable': 'primaries' },
};
const client = elasticsearchClientMock.createInternalClient(
new Promise((res) => res(clusterSettingsResponse))
);
const task = initAction({ client, indices: ['my_index'] });
const result = await task();
expect(Either.isRight(result)).toEqual(true);
});
it('resolves left when valid persistent settings, incompatible transient settings', async () => {
const clusterSettingsResponse = {
transient: { 'cluster.routing.allocation.enable': 'primaries' },
persistent: { 'cluster.routing.allocation.enable': 'alls' },
};
const client = elasticsearchClientMock.createInternalClient(
new Promise((res) => res(clusterSettingsResponse))
);
const task = initAction({ client, indices: ['my_index'] });
const result = await task();
expect(Either.isLeft(result)).toEqual(true);
});
it('resolves left when transient cluster settings are incompatible', async () => {
const clusterSettingsResponse = {
transient: { 'cluster.routing.allocation.enable': 'none' },
persistent: { 'cluster.routing.allocation.enable': 'all' },
};
const client = elasticsearchClientMock.createInternalClient(
new Promise((res) => res(clusterSettingsResponse))
);
const task = initAction({ client, indices: ['my_index'] });
const result = await task();
expect(Either.isLeft(result)).toEqual(true);
});
});

View file

@ -44,16 +44,15 @@ export const checkClusterRoutingAllocationEnabledTask =
flat_settings: true,
})
.then((settings) => {
const clusterRoutingAllocations: string[] =
// transient settings take preference over persistent settings
const clusterRoutingAllocation =
settings?.transient?.[routingAllocationEnable] ??
settings?.persistent?.[routingAllocationEnable] ??
[];
settings?.persistent?.[routingAllocationEnable];
const clusterRoutingAllocationEnabled =
[...clusterRoutingAllocations].length === 0 ||
[...clusterRoutingAllocations].every((s: string) => s === 'all'); // if set, only allow 'all'
const clusterRoutingAllocationEnabledIsAll =
clusterRoutingAllocation === undefined || clusterRoutingAllocation === 'all';
if (!clusterRoutingAllocationEnabled) {
if (!clusterRoutingAllocationEnabledIsAll) {
return Either.left({
type: 'unsupported_cluster_routing_allocation' as const,
message:

View file

@ -116,7 +116,7 @@ describe('migration actions', () => {
await client.cluster.putSettings({
body: {
persistent: {
// Remove persistent test settings
// Reset persistent test settings
cluster: { routing: { allocation: { enable: null } } },
},
},
@ -126,11 +126,11 @@ describe('migration actions', () => {
expect.assertions(1);
const task = initAction({ client, indices: ['no_such_index'] });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": Object {},
}
`);
Object {
"_tag": "Right",
"right": Object {},
}
`);
});
it('resolves right record with found indices', async () => {
expect.assertions(1);
@ -149,7 +149,7 @@ describe('migration actions', () => {
})
);
});
it('resolves left with cluster routing allocation disabled', async () => {
it('resolves left when cluster.routing.allocation.enabled is incompatible', async () => {
expect.assertions(3);
await client.cluster.putSettings({
body: {
@ -164,14 +164,14 @@ describe('migration actions', () => {
indices: ['existing_index_with_docs'],
});
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"message": "[unsupported_cluster_routing_allocation] The elasticsearch cluster has cluster routing allocation incorrectly set for migrations to continue.",
"type": "unsupported_cluster_routing_allocation",
},
}
`);
Object {
"_tag": "Left",
"left": Object {
"message": "[unsupported_cluster_routing_allocation] The elasticsearch cluster has cluster routing allocation incorrectly set for migrations to continue.",
"type": "unsupported_cluster_routing_allocation",
},
}
`);
await client.cluster.putSettings({
body: {
persistent: {
@ -185,14 +185,14 @@ describe('migration actions', () => {
indices: ['existing_index_with_docs'],
});
await expect(task2()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"message": "[unsupported_cluster_routing_allocation] The elasticsearch cluster has cluster routing allocation incorrectly set for migrations to continue.",
"type": "unsupported_cluster_routing_allocation",
},
}
`);
Object {
"_tag": "Left",
"left": Object {
"message": "[unsupported_cluster_routing_allocation] The elasticsearch cluster has cluster routing allocation incorrectly set for migrations to continue.",
"type": "unsupported_cluster_routing_allocation",
},
}
`);
await client.cluster.putSettings({
body: {
persistent: {
@ -206,14 +206,30 @@ describe('migration actions', () => {
indices: ['existing_index_with_docs'],
});
await expect(task3()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"message": "[unsupported_cluster_routing_allocation] The elasticsearch cluster has cluster routing allocation incorrectly set for migrations to continue.",
"type": "unsupported_cluster_routing_allocation",
},
}
`);
Object {
"_tag": "Left",
"left": Object {
"message": "[unsupported_cluster_routing_allocation] The elasticsearch cluster has cluster routing allocation incorrectly set for migrations to continue.",
"type": "unsupported_cluster_routing_allocation",
},
}
`);
});
it('resolves right when cluster.routing.allocation.enabled=all', async () => {
expect.assertions(1);
await client.cluster.putSettings({
body: {
persistent: {
cluster: { routing: { allocation: { enable: 'all' } } },
},
},
});
const task = initAction({
client,
indices: ['existing_index_with_docs'],
});
const result = await task();
expect(Either.isRight(result)).toBe(true);
});
});
@ -271,14 +287,14 @@ describe('migration actions', () => {
expect.assertions(1);
const task = setWriteBlock({ client, index: 'no_such_index' });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"index": "no_such_index",
"type": "index_not_found_exception",
},
}
`);
Object {
"_tag": "Left",
"left": Object {
"index": "no_such_index",
"type": "index_not_found_exception",
},
}
`);
});
});
@ -300,21 +316,21 @@ describe('migration actions', () => {
expect.assertions(1);
const task = removeWriteBlock({ client, index: 'existing_index_with_write_block_2' });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": "remove_write_block_succeeded",
}
`);
Object {
"_tag": "Right",
"right": "remove_write_block_succeeded",
}
`);
});
it('resolves right if successful when an index does not have a write block', async () => {
expect.assertions(1);
const task = removeWriteBlock({ client, index: 'existing_index_without_write_block_2' });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": "remove_write_block_succeeded",
}
`);
Object {
"_tag": "Right",
"right": "remove_write_block_succeeded",
}
`);
});
it('rejects if there is a non-retryable error', async () => {
expect.assertions(1);
@ -398,14 +414,14 @@ describe('migration actions', () => {
timeout: '1s',
});
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"message": "[index_not_yellow_timeout] Timeout waiting for the status of the [red_index] index to become 'yellow'",
"type": "index_not_yellow_timeout",
},
}
`);
Object {
"_tag": "Left",
"left": Object {
"message": "[index_not_yellow_timeout] Timeout waiting for the status of the [red_index] index to become 'yellow'",
"type": "index_not_yellow_timeout",
},
}
`);
});
});
@ -425,14 +441,14 @@ describe('migration actions', () => {
});
expect.assertions(1);
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": Object {
"acknowledged": true,
"shardsAcknowledged": true,
},
}
`);
Object {
"_tag": "Right",
"right": Object {
"acknowledged": true,
"shardsAcknowledged": true,
},
}
`);
});
it('resolves right after waiting for index status to be yellow if clone target already existed', async () => {
expect.assertions(2);
@ -491,14 +507,14 @@ describe('migration actions', () => {
expect.assertions(1);
const task = cloneIndex({ client, source: 'no_such_index', target: 'clone_target_3' });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"index": "no_such_index",
"type": "index_not_found_exception",
},
}
`);
Object {
"_tag": "Left",
"left": Object {
"index": "no_such_index",
"type": "index_not_found_exception",
},
}
`);
});
it('resolves left with a index_not_yellow_timeout if clone target already exists but takes longer than the specified timeout before turning yellow', async () => {
// Create a red index
@ -527,14 +543,14 @@ describe('migration actions', () => {
})();
await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"message": "[index_not_yellow_timeout] Timeout waiting for the status of the [clone_red_index] index to become 'yellow'",
"type": "index_not_yellow_timeout",
},
}
`);
Object {
"_tag": "Left",
"left": Object {
"message": "[index_not_yellow_timeout] Timeout waiting for the status of the [clone_red_index] index to become 'yellow'",
"type": "index_not_yellow_timeout",
},
}
`);
// Now that we know timeouts work, make the index yellow again and call cloneIndex a second time to verify that it completes
@ -555,14 +571,14 @@ describe('migration actions', () => {
})();
await expect(cloneIndexPromise2).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": Object {
"acknowledged": true,
"shardsAcknowledged": true,
},
}
`);
Object {
"_tag": "Right",
"right": Object {
"acknowledged": true,
"shardsAcknowledged": true,
},
}
`);
});
});
@ -580,11 +596,11 @@ describe('migration actions', () => {
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": "reindex_succeeded",
}
`);
Object {
"_tag": "Right",
"right": "reindex_succeeded",
}
`);
const results = (
(await searchForOutdatedDocuments(client, {
@ -620,11 +636,11 @@ describe('migration actions', () => {
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": "reindex_succeeded",
}
`);
Object {
"_tag": "Right",
"right": "reindex_succeeded",
}
`);
const results = (
(await searchForOutdatedDocuments(client, {
@ -653,11 +669,11 @@ describe('migration actions', () => {
})()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": "reindex_succeeded",
}
`);
Object {
"_tag": "Right",
"right": "reindex_succeeded",
}
`);
const results = (
(await searchForOutdatedDocuments(client, {
batchSize: 1000,