mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
Connector token client fixes (#184550)
This commit is contained in:
parent
aa109676fa
commit
d9aa9893c6
4 changed files with 170 additions and 45 deletions
|
@ -11,6 +11,7 @@ import { encryptedSavedObjectsMock } from '@kbn/encrypted-saved-objects-plugin/s
|
|||
import { ConnectorTokenClient } from './connector_token_client';
|
||||
import { Logger } from '@kbn/core/server';
|
||||
import { ConnectorToken } from '../types';
|
||||
import * as allRetry from './retry_if_conflicts';
|
||||
|
||||
const logger = loggingSystemMock.create().get() as jest.Mocked<Logger>;
|
||||
jest.mock('@kbn/core-saved-objects-utils-server', () => {
|
||||
|
@ -301,30 +302,47 @@ describe('update()', () => {
|
|||
},
|
||||
references: [],
|
||||
});
|
||||
unsecuredSavedObjectsClient.checkConflicts.mockResolvedValueOnce({
|
||||
errors: [
|
||||
{
|
||||
id: '1',
|
||||
error: {
|
||||
error: 'error',
|
||||
statusCode: 503,
|
||||
message: 'There is a conflict.',
|
||||
},
|
||||
type: 'conflict',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const result = await connectorTokenClient.update({
|
||||
id: '1',
|
||||
tokenType: 'access_token',
|
||||
token: 'testtokenvalue',
|
||||
expiresAtMillis: expiresAt,
|
||||
});
|
||||
expect(result).toEqual(null);
|
||||
expect(unsecuredSavedObjectsClient.create).toHaveBeenCalledTimes(0);
|
||||
const retryIfConflictsMock = jest.spyOn(allRetry, 'retryIfConflicts');
|
||||
retryIfConflictsMock.mockRejectedValue(new Error('There is a conflict.'));
|
||||
await expect(
|
||||
connectorTokenClient.update({
|
||||
id: '1',
|
||||
tokenType: 'access_token',
|
||||
token: 'testtokenvalue',
|
||||
expiresAtMillis: expiresAt,
|
||||
})
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(`"There is a conflict."`);
|
||||
expect(logger.error.mock.calls[0]).toMatchObject([
|
||||
'Failed to update connector_token for id "1" and tokenType: "access_token". Error: There is a conflict. ',
|
||||
'Failed to update connector_token for id "1" and tokenType: "access_token". Error: There is a conflict.',
|
||||
]);
|
||||
});
|
||||
|
||||
test('should attempt oper', async () => {
|
||||
const expiresAt = new Date().toISOString();
|
||||
|
||||
unsecuredSavedObjectsClient.get.mockResolvedValueOnce({
|
||||
id: '1',
|
||||
type: 'connector_token',
|
||||
attributes: {
|
||||
connectorId: '123',
|
||||
tokenType: 'access_token',
|
||||
token: 'testtokenvalue',
|
||||
createdAt: new Date().toISOString(),
|
||||
},
|
||||
references: [],
|
||||
});
|
||||
const retryIfConflictsMock = jest.spyOn(allRetry, 'retryIfConflicts');
|
||||
retryIfConflictsMock.mockRejectedValue(new Error('There is a conflict.'));
|
||||
await expect(
|
||||
connectorTokenClient.update({
|
||||
id: '1',
|
||||
tokenType: 'access_token',
|
||||
token: 'testtokenvalue',
|
||||
expiresAtMillis: expiresAt,
|
||||
})
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(`"There is a conflict."`);
|
||||
expect(logger.error.mock.calls[0]).toMatchObject([
|
||||
'Failed to update connector_token for id "1" and tokenType: "access_token". Error: There is a conflict.',
|
||||
]);
|
||||
});
|
||||
|
||||
|
@ -560,9 +578,7 @@ describe('updateOrReplace()', () => {
|
|||
},
|
||||
references: [],
|
||||
});
|
||||
unsecuredSavedObjectsClient.checkConflicts.mockResolvedValueOnce({
|
||||
errors: [],
|
||||
});
|
||||
|
||||
unsecuredSavedObjectsClient.create.mockResolvedValueOnce({
|
||||
id: '1',
|
||||
type: 'connector_token',
|
||||
|
@ -594,7 +610,6 @@ describe('updateOrReplace()', () => {
|
|||
expect(unsecuredSavedObjectsClient.delete).not.toHaveBeenCalled();
|
||||
|
||||
expect(unsecuredSavedObjectsClient.get).toHaveBeenCalledTimes(1);
|
||||
expect(unsecuredSavedObjectsClient.checkConflicts).toHaveBeenCalledTimes(1);
|
||||
expect(unsecuredSavedObjectsClient.create).toHaveBeenCalledTimes(1);
|
||||
expect((unsecuredSavedObjectsClient.create.mock.calls[0][1] as ConnectorToken).token).toBe(
|
||||
'newToken'
|
||||
|
|
|
@ -8,10 +8,12 @@
|
|||
import { omitBy, isUndefined } from 'lodash';
|
||||
import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server';
|
||||
import { Logger, SavedObjectsClientContract, SavedObjectsUtils } from '@kbn/core/server';
|
||||
import { retryIfConflicts } from './retry_if_conflicts';
|
||||
import { ConnectorToken } from '../types';
|
||||
import { CONNECTOR_TOKEN_SAVED_OBJECT_TYPE } from '../constants/saved_objects';
|
||||
|
||||
export const MAX_TOKENS_RETURNED = 1;
|
||||
const MAX_RETRY_ATTEMPTS = 3;
|
||||
|
||||
interface ConstructorOptions {
|
||||
encryptedSavedObjectsClient: EncryptedSavedObjectsClient;
|
||||
|
@ -107,22 +109,10 @@ export class ConnectorTokenClient {
|
|||
id
|
||||
);
|
||||
const createTime = Date.now();
|
||||
const conflicts = await this.unsecuredSavedObjectsClient.checkConflicts([
|
||||
{ id, type: 'connector_token' },
|
||||
]);
|
||||
|
||||
try {
|
||||
if (conflicts.errors.length > 0) {
|
||||
this.logger.error(
|
||||
`Failed to update connector_token for id "${id}" and tokenType: "${
|
||||
tokenType ?? 'access_token'
|
||||
}". ${conflicts.errors.reduce(
|
||||
(messages, errorObj) => `Error: ${errorObj.error.message} ${messages}`,
|
||||
''
|
||||
)}`
|
||||
);
|
||||
return null;
|
||||
} else {
|
||||
const result = await this.unsecuredSavedObjectsClient.create<ConnectorToken>(
|
||||
const updateOperation = () => {
|
||||
return this.unsecuredSavedObjectsClient.create<ConnectorToken>(
|
||||
CONNECTOR_TOKEN_SAVED_OBJECT_TYPE,
|
||||
{
|
||||
...attributes,
|
||||
|
@ -141,8 +131,16 @@ export class ConnectorTokenClient {
|
|||
isUndefined
|
||||
)
|
||||
);
|
||||
return result.attributes as ConnectorToken;
|
||||
}
|
||||
};
|
||||
|
||||
const result = await retryIfConflicts(
|
||||
this.logger,
|
||||
`accessToken.create('${id}')`,
|
||||
updateOperation,
|
||||
MAX_RETRY_ATTEMPTS
|
||||
);
|
||||
|
||||
return result.attributes as ConnectorToken;
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`Failed to update connector_token for id "${id}" and tokenType: "${
|
||||
|
@ -178,7 +176,7 @@ export class ConnectorTokenClient {
|
|||
perPage: MAX_TOKENS_RETURNED,
|
||||
type: CONNECTOR_TOKEN_SAVED_OBJECT_TYPE,
|
||||
filter: `${CONNECTOR_TOKEN_SAVED_OBJECT_TYPE}.attributes.connectorId: "${connectorId}"${tokenTypeFilter}`,
|
||||
sortField: 'updatedAt',
|
||||
sortField: 'updated_at',
|
||||
sortOrder: 'desc',
|
||||
})
|
||||
).saved_objects
|
||||
|
|
57
x-pack/plugins/actions/server/lib/retry_if_conflicts.test.ts
Normal file
57
x-pack/plugins/actions/server/lib/retry_if_conflicts.test.ts
Normal file
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import { Logger, SavedObjectsErrorHelpers } from '@kbn/core/server';
|
||||
import { retryIfConflicts, RetryForConflictsAttempts } from './retry_if_conflicts';
|
||||
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
|
||||
|
||||
jest.mock('@kbn/core/server');
|
||||
|
||||
const mockLogger = loggingSystemMock.create().get() as jest.Mocked<Logger>;
|
||||
|
||||
describe('retryIfConflicts', () => {
|
||||
let logger: Logger;
|
||||
|
||||
beforeEach(() => {
|
||||
logger = mockLogger;
|
||||
(SavedObjectsErrorHelpers.isConflictError as jest.Mock).mockReturnValue(true);
|
||||
});
|
||||
|
||||
it('should execute operation successfully without conflicts', async () => {
|
||||
const operation = jest.fn().mockResolvedValue('success');
|
||||
const result = await retryIfConflicts(logger, 'testOperation', operation);
|
||||
expect(result).toBe('success');
|
||||
expect(operation).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should retry the operation on conflict error', async () => {
|
||||
const operation = jest.fn().mockRejectedValueOnce('conflict').mockResolvedValueOnce('success');
|
||||
|
||||
const result = await retryIfConflicts(logger, 'testOperation', operation);
|
||||
expect(result).toBe('success');
|
||||
expect(operation).toHaveBeenCalledTimes(2);
|
||||
expect(logger.debug).toHaveBeenCalledWith('testOperation conflict, retrying ...');
|
||||
});
|
||||
|
||||
it('should throw error if maximum retries exceeded', async () => {
|
||||
const operation = jest.fn().mockRejectedValue('conflict');
|
||||
|
||||
await expect(retryIfConflicts(logger, 'testOperation', operation)).rejects.toBe('conflict');
|
||||
expect(operation).toHaveBeenCalledTimes(RetryForConflictsAttempts + 1);
|
||||
expect(logger.warn).toHaveBeenCalledWith('testOperation conflict, exceeded retries');
|
||||
});
|
||||
|
||||
it('should throw non-conflict error immediately', async () => {
|
||||
(SavedObjectsErrorHelpers.isConflictError as jest.Mock).mockReturnValue(false);
|
||||
const nonConflictError = new Error('non-conflict error');
|
||||
const operation = jest.fn().mockRejectedValue(nonConflictError);
|
||||
|
||||
await expect(retryIfConflicts(logger, 'testOperation', operation)).rejects.toThrow(
|
||||
nonConflictError
|
||||
);
|
||||
expect(operation).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
55
x-pack/plugins/actions/server/lib/retry_if_conflicts.ts
Normal file
55
x-pack/plugins/actions/server/lib/retry_if_conflicts.ts
Normal file
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
// This module provides a helper to perform retries on a function if the
|
||||
// function ends up throwing a SavedObject 409 conflict.
|
||||
// This is a copy of the retryIfConflicts function from the alerting plugin
|
||||
|
||||
import { Logger, SavedObjectsErrorHelpers } from '@kbn/core/server';
|
||||
|
||||
type RetryableForConflicts<T> = () => Promise<T>;
|
||||
|
||||
// number of times to retry when conflicts occur
|
||||
export const RetryForConflictsAttempts = 2;
|
||||
|
||||
// milliseconds to wait before retrying when conflicts occur
|
||||
// note: we considered making this random, to help avoid a stampede, but
|
||||
// with 1 retry it probably doesn't matter, and adding randomness could
|
||||
// make it harder to diagnose issues
|
||||
const RetryForConflictsDelay = 250;
|
||||
|
||||
// retry an operation if it runs into 409 Conflict's, up to a limit
|
||||
export async function retryIfConflicts<T>(
|
||||
logger: Logger,
|
||||
name: string,
|
||||
operation: RetryableForConflicts<T>,
|
||||
retries: number = RetryForConflictsAttempts
|
||||
): Promise<T> {
|
||||
// run the operation, return if no errors or throw if not a conflict error
|
||||
try {
|
||||
return await operation();
|
||||
} catch (err) {
|
||||
if (!SavedObjectsErrorHelpers.isConflictError(err)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
// must be a conflict; if no retries left, throw it
|
||||
if (retries <= 0) {
|
||||
logger.warn(`${name} conflict, exceeded retries`);
|
||||
throw err;
|
||||
}
|
||||
|
||||
// delay a bit before retrying
|
||||
logger.debug(`${name} conflict, retrying ...`);
|
||||
await waitBeforeNextRetry();
|
||||
return await retryIfConflicts(logger, name, operation, retries - 1);
|
||||
}
|
||||
}
|
||||
|
||||
async function waitBeforeNextRetry(): Promise<void> {
|
||||
await new Promise((resolve) => setTimeout(resolve, RetryForConflictsDelay));
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue