Don't start pollEsNodesVersion unless someone subscribes (#56923) (#58947)

* Don't start pollEsNodesVersion unless someone subscribes

By not polling until subscribed to, we prevent verbose error logs when
the optimizer is run (which automatically skips migrations).

* Test pollEsNodeVersions behaviour

* Cleanup unused code

* PR Feedback

* Make test more stable

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Rudolf Meijering 2020-03-02 13:29:53 +01:00 committed by GitHub
parent c2aafce945
commit c20a8c1b3e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 127 additions and 63 deletions

View file

@ -33,6 +33,9 @@ import { ElasticsearchService } from './elasticsearch_service';
import { elasticsearchServiceMock } from './elasticsearch_service.mock'; import { elasticsearchServiceMock } from './elasticsearch_service.mock';
import { duration } from 'moment'; import { duration } from 'moment';
const delay = async (durationMs: number) =>
await new Promise(resolve => setTimeout(resolve, durationMs));
let elasticsearchService: ElasticsearchService; let elasticsearchService: ElasticsearchService;
const configService = configServiceMock.create(); const configService = configServiceMock.create();
const deps = { const deps = {
@ -42,7 +45,7 @@ configService.atPath.mockReturnValue(
new BehaviorSubject({ new BehaviorSubject({
hosts: ['http://1.2.3.4'], hosts: ['http://1.2.3.4'],
healthCheck: { healthCheck: {
delay: duration(2000), delay: duration(10),
}, },
ssl: { ssl: {
verificationMode: 'none', verificationMode: 'none',
@ -125,8 +128,8 @@ describe('#setup', () => {
const config = MockClusterClient.mock.calls[0][0]; const config = MockClusterClient.mock.calls[0][0];
expect(config).toMatchInlineSnapshot(` expect(config).toMatchInlineSnapshot(`
Object { Object {
"healthCheckDelay": "PT2S", "healthCheckDelay": "PT0.01S",
"hosts": Array [ "hosts": Array [
"http://8.8.8.8", "http://8.8.8.8",
], ],
@ -138,8 +141,8 @@ Object {
"certificate": "certificate-value", "certificate": "certificate-value",
"verificationMode": "none", "verificationMode": "none",
}, },
} }
`); `);
}); });
it('falls back to elasticsearch config if custom config not passed', async () => { it('falls back to elasticsearch config if custom config not passed', async () => {
const setupContract = await elasticsearchService.setup(deps); const setupContract = await elasticsearchService.setup(deps);
@ -150,8 +153,8 @@ Object {
const config = MockClusterClient.mock.calls[0][0]; const config = MockClusterClient.mock.calls[0][0];
expect(config).toMatchInlineSnapshot(` expect(config).toMatchInlineSnapshot(`
Object { Object {
"healthCheckDelay": "PT2S", "healthCheckDelay": "PT0.01S",
"hosts": Array [ "hosts": Array [
"http://1.2.3.4", "http://1.2.3.4",
], ],
@ -166,8 +169,8 @@ Object {
"keyPassphrase": undefined, "keyPassphrase": undefined,
"verificationMode": "none", "verificationMode": "none",
}, },
} }
`); `);
}); });
it('does not merge elasticsearch hosts if custom config overrides', async () => { it('does not merge elasticsearch hosts if custom config overrides', async () => {
@ -213,6 +216,45 @@ Object {
`); `);
}); });
}); });
it('esNodeVersionCompatibility$ only starts polling when subscribed to', async done => {
const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient();
const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient();
MockClusterClient.mockImplementationOnce(
() => mockAdminClusterClientInstance
).mockImplementationOnce(() => mockDataClusterClientInstance);
mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
const setupContract = await elasticsearchService.setup(deps);
await delay(10);
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
setupContract.esNodesCompatibility$.subscribe(() => {
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
done();
});
});
it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async done => {
const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient();
const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient();
MockClusterClient.mockImplementationOnce(
() => mockAdminClusterClientInstance
).mockImplementationOnce(() => mockDataClusterClientInstance);
mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
const setupContract = await elasticsearchService.setup(deps);
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
const sub = setupContract.esNodesCompatibility$.subscribe(async () => {
sub.unsubscribe();
await delay(100);
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
done();
});
});
}); });
describe('#stop', () => { describe('#stop', () => {
@ -229,4 +271,27 @@ describe('#stop', () => {
expect(mockAdminClusterClientInstance.close).toHaveBeenCalledTimes(1); expect(mockAdminClusterClientInstance.close).toHaveBeenCalledTimes(1);
expect(mockDataClusterClientInstance.close).toHaveBeenCalledTimes(1); expect(mockDataClusterClientInstance.close).toHaveBeenCalledTimes(1);
}); });
it('stops pollEsNodeVersions even if there are active subscriptions', async done => {
expect.assertions(2);
const mockAdminClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient();
const mockDataClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient();
MockClusterClient.mockImplementationOnce(
() => mockAdminClusterClientInstance
).mockImplementationOnce(() => mockDataClusterClientInstance);
mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
const setupContract = await elasticsearchService.setup(deps);
setupContract.esNodesCompatibility$.subscribe(async () => {
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
await elasticsearchService.stop();
await delay(100);
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
done();
});
});
}); });

View file

@ -17,8 +17,17 @@
* under the License. * under the License.
*/ */
import { ConnectableObservable, Observable, Subscription } from 'rxjs'; import { ConnectableObservable, Observable, Subscription, Subject } from 'rxjs';
import { filter, first, map, publishReplay, switchMap, take } from 'rxjs/operators'; import {
filter,
first,
map,
publishReplay,
switchMap,
take,
shareReplay,
takeUntil,
} from 'rxjs/operators';
import { CoreService } from '../../types'; import { CoreService } from '../../types';
import { merge } from '../../utils'; import { merge } from '../../utils';
@ -47,13 +56,8 @@ interface SetupDeps {
export class ElasticsearchService implements CoreService<InternalElasticsearchServiceSetup> { export class ElasticsearchService implements CoreService<InternalElasticsearchServiceSetup> {
private readonly log: Logger; private readonly log: Logger;
private readonly config$: Observable<ElasticsearchConfig>; private readonly config$: Observable<ElasticsearchConfig>;
private subscriptions: { private subscription: Subscription | undefined;
client?: Subscription; private stop$ = new Subject();
esNodesCompatibility?: Subscription;
} = {
client: undefined,
esNodesCompatibility: undefined,
};
private kibanaVersion: string; private kibanaVersion: string;
constructor(private readonly coreContext: CoreContext) { constructor(private readonly coreContext: CoreContext) {
@ -69,7 +73,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
const clients$ = this.config$.pipe( const clients$ = this.config$.pipe(
filter(() => { filter(() => {
if (this.subscriptions.client !== undefined) { if (this.subscription !== undefined) {
this.log.error('Clients cannot be changed after they are created'); this.log.error('Clients cannot be changed after they are created');
return false; return false;
} }
@ -100,7 +104,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
publishReplay(1) publishReplay(1)
) as ConnectableObservable<CoreClusterClients>; ) as ConnectableObservable<CoreClusterClients>;
this.subscriptions.client = clients$.connect(); this.subscription = clients$.connect();
const config = await this.config$.pipe(first()).toPromise(); const config = await this.config$.pipe(first()).toPromise();
@ -164,18 +168,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
ignoreVersionMismatch: config.ignoreVersionMismatch, ignoreVersionMismatch: config.ignoreVersionMismatch,
esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(), esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
kibanaVersion: this.kibanaVersion, kibanaVersion: this.kibanaVersion,
}).pipe(publishReplay(1)); }).pipe(takeUntil(this.stop$), shareReplay({ refCount: true, bufferSize: 1 }));
this.subscriptions.esNodesCompatibility = (esNodesCompatibility$ as ConnectableObservable<
unknown
>).connect();
// TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983
esNodesCompatibility$.subscribe(({ isCompatible, message }) => {
if (!isCompatible && message) {
this.log.error(message);
}
});
return { return {
legacy: { config$: clients$.pipe(map(clients => clients.config)) }, legacy: { config$: clients$.pipe(map(clients => clients.config)) },
@ -195,12 +188,10 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
public async stop() { public async stop() {
this.log.debug('Stopping elasticsearch service'); this.log.debug('Stopping elasticsearch service');
// TODO(TS-3.7-ESLINT) if (this.subscription !== undefined) {
// eslint-disable-next-line no-unused-expressions this.subscription.unsubscribe();
this.subscriptions.client?.unsubscribe(); }
// eslint-disable-next-line no-unused-expressions this.stop$.next();
this.subscriptions.esNodesCompatibility?.unsubscribe();
this.subscriptions = { client: undefined, esNodesCompatibility: undefined };
} }
private createClusterClient( private createClusterClient(

View file

@ -299,6 +299,14 @@ export class SavedObjectsService
this.logger.info( this.logger.info(
'Waiting until all Elasticsearch nodes are compatible with Kibana before starting saved objects migrations...' 'Waiting until all Elasticsearch nodes are compatible with Kibana before starting saved objects migrations...'
); );
// TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983
this.setupDeps!.elasticsearch.esNodesCompatibility$.subscribe(({ isCompatible, message }) => {
if (!isCompatible && message) {
this.logger.error(message);
}
});
await this.setupDeps!.elasticsearch.esNodesCompatibility$.pipe( await this.setupDeps!.elasticsearch.esNodesCompatibility$.pipe(
filter(nodes => nodes.isCompatible), filter(nodes => nodes.isCompatible),
take(1) take(1)