[Reporting/CSV] Resolve max_concurrent_shard_requests issue (#182536)

## Summary

There has been a consistent failure in a Discover-related test set in
the kibana-ES-serverless verification job, meaning that ES-Kibana
compatibility has drifted.

Error details:
```
 + "Encountered an unknown error: status_exception
 + 	Root causes:
 + 		status_exception: Parameter validation failed for [/_search]: The http parameter [max_concurrent_shard_requests] (with value [5]) is not permitted when running in serverless mode"
 + "Encountered an error with the number of CSV rows generated fromthe search: expected rows were indeterminable, received 0."
       at Context.<anonymous> (reporting.ts:182:33)
       at processTicksAndRejections (node:internal/process/task_queues:95:5)
       at Object.apply (wrap_function.js:73:16)
```

This tracked back to a feature added for reporting awhile back, which
created a config schema field for the `max_concurrent_shard_requests`
parameter in the search queries:
https://github.com/elastic/kibana/pull/170344/files#diff-7bceb37eef3761e1161cf04f41668dd9195bfac9fea36e734a230b5ed878a974

Most of the changes in this PR are in test code. I created "Test" which
extend protected methods in the original classes. This was done to
remove the `@ts-expect-errors` lines of code.
This commit is contained in:
Tim Sullivan 2024-05-03 07:30:45 -07:00 committed by GitHub
parent df0949469a
commit f51c5c92bc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 264 additions and 77 deletions

View file

@ -152,6 +152,7 @@ xpack.screenshotting.enabled: false
xpack.reporting.queue.pollInterval: 3m
xpack.reporting.roles.enabled: false
xpack.reporting.statefulSettings.enabled: false
xpack.reporting.csv.maxConcurrentShardRequests: 0
# Disabled Observability plugins
xpack.ux.enabled: false

View file

@ -6,22 +6,53 @@
* Side Public License, v 1.
*/
import * as Rx from 'rxjs';
import type { estypes } from '@elastic/elasticsearch';
import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import type { ISearchClient } from '@kbn/data-plugin/common';
import { createSearchSourceMock } from '@kbn/data-plugin/common/search/search_source/mocks';
import { createSearchRequestHandlerContext } from '@kbn/data-plugin/server/search/mocks';
import type { SearchCursor, SearchCursorSettings } from './search_cursor';
import type { SearchCursorSettings } from './search_cursor';
import { SearchCursorPit } from './search_cursor_pit';
class TestSearchCursorPit extends SearchCursorPit {
constructor(...args: ConstructorParameters<typeof SearchCursorPit>) {
super(...args);
}
public getCursorId() {
return this.cursorId;
}
public openPointInTime() {
return super.openPointInTime();
}
public searchWithPit(...args: Parameters<SearchCursorPit['searchWithPit']>) {
return super.searchWithPit(...args);
}
public setSearchAfter(...args: Parameters<SearchCursorPit['setSearchAfter']>) {
return super.setSearchAfter(...args);
}
public getSearchAfter() {
return super.getSearchAfter();
}
}
describe('CSV Export Search Cursor', () => {
let settings: SearchCursorSettings;
let es: IScopedClusterClient;
let data: ISearchClient;
let logger: Logger;
let cursor: SearchCursor;
let cursor: TestSearchCursorPit;
beforeEach(async () => {
let openPointInTimeSpy: jest.SpyInstance<Promise<estypes.OpenPointInTimeResponse>>;
beforeEach(() => {
settings = {
scroll: {
duration: jest.fn(() => '10m'),
@ -34,56 +65,116 @@ describe('CSV Export Search Cursor', () => {
es = elasticsearchServiceMock.createScopedClusterClient();
data = createSearchRequestHandlerContext();
jest.spyOn(es.asCurrentUser, 'openPointInTime').mockResolvedValue({ id: 'somewhat-pit-id' });
openPointInTimeSpy = jest
.spyOn(es.asCurrentUser, 'openPointInTime')
.mockResolvedValue({ id: 'somewhat-pit-id' });
logger = loggingSystemMock.createLogger();
cursor = new SearchCursorPit(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);
const openPointInTimeSpy = jest
// @ts-expect-error create spy on private method
.spyOn(cursor, 'openPointInTime');
await cursor.initialize();
expect(openPointInTimeSpy).toBeCalledTimes(1);
});
it('supports point-in-time', async () => {
const searchWithPitSpy = jest
// @ts-expect-error create spy on private method
.spyOn(cursor, 'searchWithPit')
// @ts-expect-error mock resolved value for spy on private method
.mockResolvedValueOnce({ rawResponse: { hits: [] } });
describe('with default settings', () => {
beforeEach(async () => {
cursor = new TestSearchCursorPit(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);
const searchSource = createSearchSourceMock();
await cursor.getPage(searchSource);
expect(searchWithPitSpy).toBeCalledTimes(1);
await cursor.initialize();
expect(openPointInTimeSpy).toBeCalledTimes(1);
});
it('supports pit and max_concurrent_shard_requests', async () => {
const dataSearchSpy = jest
.spyOn(data, 'search')
.mockReturnValue(Rx.of({ rawResponse: { hits: { hits: [] } } }));
const searchSource = createSearchSourceMock();
await cursor.getPage(searchSource);
expect(dataSearchSpy).toBeCalledTimes(1);
expect(dataSearchSpy).toBeCalledWith(
{
params: {
body: expect.objectContaining({ pit: { id: 'somewhat-pit-id', keep_alive: '10m' } }),
max_concurrent_shard_requests: 5,
},
},
expect.objectContaining({
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: '10m' },
})
);
});
it('can update internal cursor ID', () => {
cursor.updateIdFromResults({ pit_id: 'very-typical-pit-id', hits: { hits: [] } });
expect(cursor.getCursorId()).toBe('very-typical-pit-id');
});
it('manages search_after', () => {
cursor.setSearchAfter([
{
_index: 'test-index',
_id: 'test-doc-id',
sort: ['Wed Jan 17 15:35:47 MST 2024', 42],
},
]);
expect(cursor.getSearchAfter()).toEqual(['Wed Jan 17 15:35:47 MST 2024', 42]);
});
});
it('can update internal cursor ID', () => {
cursor.updateIdFromResults({ pit_id: 'very-typical-pit-id', hits: { hits: [] } });
// @ts-expect-error private field
expect(cursor.cursorId).toBe('very-typical-pit-id');
});
describe('with max_concurrent_shard_requests=0', () => {
beforeEach(async () => {
settings.maxConcurrentShardRequests = 0;
it('manages search_after', () => {
// @ts-expect-error access private method
cursor.setSearchAfter([
{
_index: 'test-index',
_id: 'test-doc-id',
sort: ['Wed Jan 17 15:35:47 MST 2024', 42],
},
]);
cursor = new TestSearchCursorPit(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);
// @ts-expect-error access private method
expect(cursor.getSearchAfter()).toEqual(['Wed Jan 17 15:35:47 MST 2024', 42]);
await cursor.initialize();
expect(openPointInTimeSpy).toBeCalledTimes(1);
});
it('suppresses max_concurrent_shard_requests from search body', async () => {
const dataSearchSpy = jest
.spyOn(data, 'search')
.mockReturnValue(Rx.of({ rawResponse: { hits: { hits: [] } } }));
const searchSource = createSearchSourceMock();
await cursor.getPage(searchSource);
expect(dataSearchSpy).toBeCalledTimes(1);
expect(dataSearchSpy).toBeCalledWith(
{
params: {
body: {
fields: [],
pit: { id: 'somewhat-pit-id', keep_alive: '10m' },
query: { bool: { filter: [], must: [], must_not: [], should: [] } },
runtime_mappings: {},
script_fields: {},
stored_fields: ['*'],
},
max_concurrent_shard_requests: undefined,
},
},
{
abortSignal: expect.any(AbortSignal),
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: '10m' },
}
);
});
});
});

View file

@ -37,7 +37,7 @@ export class SearchCursorPit extends SearchCursor {
this.cursorId = await this.openPointInTime();
}
private async openPointInTime() {
protected async openPointInTime() {
const { includeFrozen, maxConcurrentShardRequests, scroll, taskInstanceFields } = this.settings;
let pitId: string | undefined;
@ -74,13 +74,17 @@ export class SearchCursorPit extends SearchCursor {
return pitId;
}
private async searchWithPit(searchBody: SearchRequest) {
protected async searchWithPit(searchBody: SearchRequest) {
const { maxConcurrentShardRequests, scroll, taskInstanceFields } = this.settings;
// maxConcurrentShardRequests=0 is not supported
const effectiveMaxConcurrentShardRequests =
maxConcurrentShardRequests > 0 ? maxConcurrentShardRequests : undefined;
const searchParamsPit = {
params: {
body: searchBody,
max_concurrent_shard_requests: maxConcurrentShardRequests,
max_concurrent_shard_requests: effectiveMaxConcurrentShardRequests,
},
};
@ -146,14 +150,14 @@ export class SearchCursorPit extends SearchCursor {
this.setSearchAfter(hits); // for pit only
}
private getSearchAfter() {
protected getSearchAfter() {
return this.searchAfter;
}
/**
* For managing the search_after parameter, needed for paging using point-in-time
*/
private setSearchAfter(hits: Array<estypes.SearchHit<unknown>>) {
protected setSearchAfter(hits: Array<estypes.SearchHit<unknown>>) {
// Update last sort results for next query. PIT is used, so the sort results
// automatically include _shard_doc as a tiebreaker
this.searchAfter = hits[hits.length - 1]?.sort as estypes.SortResults | undefined;

View file

@ -6,22 +6,34 @@
* Side Public License, v 1.
*/
import * as Rx from 'rxjs';
import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import type { ISearchClient } from '@kbn/data-plugin/common';
import { createSearchSourceMock } from '@kbn/data-plugin/common/search/search_source/mocks';
import { createSearchRequestHandlerContext } from '@kbn/data-plugin/server/search/mocks';
import type { SearchCursor, SearchCursorSettings } from './search_cursor';
import type { SearchCursorSettings } from './search_cursor';
import { SearchCursorScroll } from './search_cursor_scroll';
class TestSearchCursorScroll extends SearchCursorScroll {
constructor(...args: ConstructorParameters<typeof SearchCursorScroll>) {
super(...args);
}
public getCursorId() {
return this.cursorId;
}
}
describe('CSV Export Search Cursor', () => {
let settings: SearchCursorSettings;
let es: IScopedClusterClient;
let data: ISearchClient;
let logger: Logger;
let cursor: SearchCursor;
let cursor: TestSearchCursorScroll;
beforeEach(async () => {
beforeEach(() => {
settings = {
scroll: {
duration: jest.fn(() => '10m'),
@ -37,33 +49,108 @@ describe('CSV Export Search Cursor', () => {
jest.spyOn(es.asCurrentUser, 'openPointInTime').mockResolvedValue({ id: 'simply-scroll-id' });
logger = loggingSystemMock.createLogger();
cursor = new SearchCursorScroll(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);
await cursor.initialize();
});
it('supports scan/scroll', async () => {
const scanSpy = jest
// @ts-expect-error create spy on private method
.spyOn(cursor, 'scan')
// @ts-expect-error mock resolved value for spy on private method
.mockResolvedValueOnce({ rawResponse: { hits: [] } });
describe('with default settings', () => {
beforeEach(async () => {
cursor = new TestSearchCursorScroll(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);
const searchSource = createSearchSourceMock();
await cursor.getPage(searchSource);
expect(scanSpy).toBeCalledTimes(1);
await cursor.initialize();
});
it('supports scan/scroll and max_concurrent_shard_requests', async () => {
const dataSearchSpy = jest
.spyOn(data, 'search')
.mockReturnValue(Rx.of({ rawResponse: { hits: { hits: [] } } }));
const searchSource = createSearchSourceMock();
await cursor.getPage(searchSource);
expect(dataSearchSpy).toBeCalledTimes(1);
expect(dataSearchSpy).toBeCalledWith(
{
params: {
body: {
fields: [],
query: { bool: { filter: [], must: [], must_not: [], should: [] } },
runtime_mappings: {},
script_fields: {},
stored_fields: ['*'],
},
ignore_throttled: undefined,
index: 'test-index-pattern-string',
max_concurrent_shard_requests: 5,
scroll: '10m',
size: 500,
},
},
{
abortSignal: expect.any(AbortSignal),
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: '10m' },
}
);
});
it('can update internal cursor ID', () => {
cursor.updateIdFromResults({ _scroll_id: 'not-unusual-scroll-id' });
expect(cursor.getCursorId()).toBe('not-unusual-scroll-id');
});
});
it('can update internal cursor ID', () => {
cursor.updateIdFromResults({ _scroll_id: 'not-unusual-scroll-id', hits: { hits: [] } });
// @ts-expect-error private field
expect(cursor.cursorId).toBe('not-unusual-scroll-id');
describe('with max_concurrent_shard_requests=0', () => {
beforeEach(async () => {
settings.maxConcurrentShardRequests = 0;
cursor = new TestSearchCursorScroll(
'test-index-pattern-string',
settings,
{ data, es },
new AbortController(),
logger
);
await cursor.initialize();
});
it('suppresses max_concurrent_shard_requests from search body', async () => {
const dataSearchSpy = jest
.spyOn(data, 'search')
.mockReturnValue(Rx.of({ rawResponse: { hits: { hits: [] } } }));
const searchSource = createSearchSourceMock();
await cursor.getPage(searchSource);
expect(dataSearchSpy).toBeCalledTimes(1);
expect(dataSearchSpy).toBeCalledWith(
{
params: {
body: {
fields: [],
query: { bool: { filter: [], must: [], must_not: [], should: [] } },
runtime_mappings: {},
script_fields: {},
stored_fields: ['*'],
},
ignore_throttled: undefined,
index: 'test-index-pattern-string',
max_concurrent_shard_requests: undefined,
scroll: '10m',
size: 500,
},
},
{
abortSignal: expect.any(AbortSignal),
strategy: 'es',
transport: { maxRetries: 0, requestTimeout: '10m' },
}
);
});
});
});

View file

@ -35,6 +35,10 @@ export class SearchCursorScroll extends SearchCursor {
private async scan(searchBody: SearchRequest) {
const { includeFrozen, maxConcurrentShardRequests, scroll, taskInstanceFields } = this.settings;
// maxConcurrentShardRequests=0 is not supported
const effectiveMaxConcurrentShardRequests =
maxConcurrentShardRequests > 0 ? maxConcurrentShardRequests : undefined;
const searchParamsScan = {
params: {
body: searchBody,
@ -42,7 +46,7 @@ export class SearchCursorScroll extends SearchCursor {
scroll: scroll.duration(taskInstanceFields),
size: scroll.size,
ignore_throttled: includeFrozen ? false : undefined, // "true" will cause deprecation warnings logged in ES
max_concurrent_shard_requests: maxConcurrentShardRequests,
max_concurrent_shard_requests: effectiveMaxConcurrentShardRequests,
},
};