mirror of
https://github.com/elastic/kibana.git
synced 2025-06-27 18:51:07 -04:00
Improve task manager functional tests in preperation for mget task claimer being the default (#196399)
Resolves https://github.com/elastic/kibana/issues/184942
Resolves https://github.com/elastic/kibana/issues/192023
Resolves https://github.com/elastic/kibana/issues/195573
In this PR, I'm improving the flakiness found in our functional tests in
preperation for mget being the default task claimer that all these tests
run with (https://github.com/elastic/kibana/issues/194625). Because the
mget task claimer works differently and also polls more frequently, we
end-up in situations where tasks run faster than they were with
update_by_query, creating more race conditions that are now fixed in
this PR.
Issues were surfaced via https://github.com/elastic/kibana/pull/190148
where I set `mget` as the default task claiming strategy.
Flaky test runs (some of these failed on other tests that are flaky):
-
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7151
-
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7169
-
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7172
-
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7175
-
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7176
-
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7185
(for
0fcf1ae689
)
This commit is contained in:
parent
6bfe8a7a9e
commit
3b8cf1236b
18 changed files with 174 additions and 60 deletions
|
@ -20,7 +20,7 @@ export const RetryForConflictsAttempts = 2;
|
|||
// note: we considered making this random, to help avoid a stampede, but
|
||||
// with 1 retry it probably doesn't matter, and adding randomness could
|
||||
// make it harder to diagnose issues
|
||||
const RetryForConflictsDelay = 250;
|
||||
const RetryForConflictsDelay = 100;
|
||||
|
||||
// retry an operation if it runs into 409 Conflict's, up to a limit
|
||||
export async function retryIfConflicts<T>(
|
||||
|
|
|
@ -22,7 +22,7 @@ export const RetryForConflictsAttempts = 2;
|
|||
// note: we considered making this random, to help avoid a stampede, but
|
||||
// with 1 retry it probably doesn't matter, and adding randomness could
|
||||
// make it harder to diagnose issues
|
||||
const RetryForConflictsDelay = 250;
|
||||
const RetryForConflictsDelay = 100;
|
||||
|
||||
// retry an operation if it runs into 409 Conflict's, up to a limit
|
||||
export async function retryIfConflicts<T>(
|
||||
|
|
|
@ -357,7 +357,10 @@ async function searchAvailableTasks({
|
|||
// Task must be enabled
|
||||
EnabledTask,
|
||||
// a task type that's not excluded (may be removed or not)
|
||||
OneOfTaskTypes('task.taskType', claimPartitions.unlimitedTypes),
|
||||
OneOfTaskTypes(
|
||||
'task.taskType',
|
||||
claimPartitions.unlimitedTypes.concat(Array.from(removedTypes))
|
||||
),
|
||||
// Either a task with idle status and runAt <= now or
|
||||
// status running or claiming with a retryAt <= now.
|
||||
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
|
||||
|
|
|
@ -132,6 +132,7 @@ function getIndexRecordActionType() {
|
|||
secrets,
|
||||
reference: params.reference,
|
||||
source: 'action:test.index-record',
|
||||
'@timestamp': new Date(),
|
||||
},
|
||||
});
|
||||
return { status: 'ok', actionId };
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
import { omit } from 'lodash';
|
||||
import type { Client } from '@elastic/elasticsearch';
|
||||
import { DeleteByQueryRequest } from '@elastic/elasticsearch/lib/api/types';
|
||||
|
||||
|
@ -61,6 +62,9 @@ export class ESTestIndexTool {
|
|||
group: {
|
||||
type: 'keyword',
|
||||
},
|
||||
'@timestamp': {
|
||||
type: 'date',
|
||||
},
|
||||
host: {
|
||||
properties: {
|
||||
hostname: {
|
||||
|
@ -109,6 +113,7 @@ export class ESTestIndexTool {
|
|||
async search(source: string, reference?: string) {
|
||||
const body = reference
|
||||
? {
|
||||
sort: [{ '@timestamp': 'asc' }],
|
||||
query: {
|
||||
bool: {
|
||||
must: [
|
||||
|
@ -127,6 +132,7 @@ export class ESTestIndexTool {
|
|||
},
|
||||
}
|
||||
: {
|
||||
sort: [{ '@timestamp': 'asc' }],
|
||||
query: {
|
||||
term: {
|
||||
source,
|
||||
|
@ -138,7 +144,16 @@ export class ESTestIndexTool {
|
|||
size: 1000,
|
||||
body,
|
||||
};
|
||||
return await this.es.search(params, { meta: true });
|
||||
const result = await this.es.search(params, { meta: true });
|
||||
result.body.hits.hits = result.body.hits.hits.map((hit) => {
|
||||
return {
|
||||
...hit,
|
||||
// Easier to remove @timestamp than to have all the downstream code ignore it
|
||||
// in their assertions
|
||||
_source: omit(hit._source as Record<string, unknown>, '@timestamp'),
|
||||
};
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
async getAll(size: number = 10) {
|
||||
|
|
|
@ -125,7 +125,7 @@ export default function apiKeyBackfillTests({ getService }: FtrProviderContext)
|
|||
}
|
||||
|
||||
it('should wait to invalidate API key until backfill for rule is complete', async () => {
|
||||
const start = moment().utc().startOf('day').subtract(7, 'days').toISOString();
|
||||
const start = moment().utc().startOf('day').subtract(13, 'days').toISOString();
|
||||
const end = moment().utc().startOf('day').subtract(4, 'day').toISOString();
|
||||
const spaceId = SuperuserAtSpace1.space.id;
|
||||
|
||||
|
|
|
@ -1184,7 +1184,7 @@ instanceStateValue: true
|
|||
reference,
|
||||
overwrites: {
|
||||
enabled: false,
|
||||
schedule: { interval: '1s' },
|
||||
schedule: { interval: '1m' },
|
||||
},
|
||||
});
|
||||
|
||||
|
@ -1288,7 +1288,7 @@ instanceStateValue: true
|
|||
);
|
||||
|
||||
// @ts-expect-error doesnt handle total: number
|
||||
expect(searchResult.body.hits.total.value).to.eql(1);
|
||||
expect(searchResult.body.hits.total.value).to.be.greaterThan(0);
|
||||
// @ts-expect-error _source: unknown
|
||||
expect(searchResult.body.hits.hits[0]._source.params.message).to.eql(
|
||||
'Alerts, all:2, new:2 IDs:[1,2,], ongoing:0 IDs:[], recovered:0 IDs:[]'
|
||||
|
@ -1304,7 +1304,7 @@ instanceStateValue: true
|
|||
const response = await alertUtils.createAlwaysFiringRuleWithSummaryAction({
|
||||
reference,
|
||||
overwrites: {
|
||||
schedule: { interval: '1s' },
|
||||
schedule: { interval: '1h' },
|
||||
},
|
||||
notifyWhen: 'onActiveAlert',
|
||||
throttle: null,
|
||||
|
@ -1435,7 +1435,7 @@ instanceStateValue: true
|
|||
const response = await alertUtils.createAlwaysFiringRuleWithSummaryAction({
|
||||
reference,
|
||||
overwrites: {
|
||||
schedule: { interval: '1s' },
|
||||
schedule: { interval: '3s' },
|
||||
},
|
||||
notifyWhen: 'onThrottleInterval',
|
||||
throttle: '10s',
|
||||
|
|
|
@ -82,8 +82,8 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
.send(
|
||||
getTestRuleData({
|
||||
rule_type_id: 'test.patternFiring',
|
||||
schedule: { interval: '1s' },
|
||||
throttle: null,
|
||||
schedule: { interval: '2s' },
|
||||
throttle: '1s',
|
||||
params: {
|
||||
pattern,
|
||||
},
|
||||
|
@ -665,6 +665,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
status_change_threshold: 4,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
const { body: createdAction } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/actions/connector`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
|
@ -763,6 +767,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
status_change_threshold: 4,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
const { body: createdAction } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/actions/connector`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
|
@ -871,6 +879,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
status_change_threshold: 4,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
const { body: createdAction } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/actions/connector`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
|
@ -964,6 +976,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
status_change_threshold: 4,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
const { body: createdAction } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/actions/connector`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
|
@ -1067,6 +1083,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
status_change_threshold: 5,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
const { body: createdAction } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/actions/connector`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
|
@ -1166,6 +1186,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
status_change_threshold: 4,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
const { body: createdAction } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/actions/connector`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
|
@ -1192,7 +1216,8 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
.send(
|
||||
getTestRuleData({
|
||||
rule_type_id: 'test.patternFiring',
|
||||
schedule: { interval: '1s' },
|
||||
schedule: { interval: '2s' },
|
||||
notify_when: RuleNotifyWhen.THROTTLE,
|
||||
throttle: '1s',
|
||||
params: {
|
||||
pattern,
|
||||
|
@ -1263,6 +1288,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
status_change_threshold: 4,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
const { body: createdAction } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/actions/connector`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
|
@ -1289,7 +1318,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
.send(
|
||||
getTestRuleData({
|
||||
rule_type_id: 'test.patternFiring',
|
||||
schedule: { interval: '1s' },
|
||||
schedule: { interval: '2s' },
|
||||
throttle: null,
|
||||
notify_when: null,
|
||||
params: {
|
||||
|
@ -1302,8 +1331,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
params: {},
|
||||
frequency: {
|
||||
summary: false,
|
||||
throttle: '1s',
|
||||
notify_when: RuleNotifyWhen.THROTTLE,
|
||||
notify_when: RuleNotifyWhen.ACTIVE,
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -1312,8 +1340,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
params: {},
|
||||
frequency: {
|
||||
summary: false,
|
||||
throttle: '1s',
|
||||
notify_when: RuleNotifyWhen.THROTTLE,
|
||||
notify_when: RuleNotifyWhen.ACTIVE,
|
||||
},
|
||||
},
|
||||
],
|
||||
|
@ -1371,6 +1398,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
status_change_threshold: 4,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
const { body: createdAction } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/actions/connector`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
|
@ -1396,7 +1427,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
.send(
|
||||
getTestRuleData({
|
||||
rule_type_id: 'test.patternFiring',
|
||||
schedule: { interval: '1s' },
|
||||
schedule: { interval: '2s' },
|
||||
throttle: '1s',
|
||||
params: {
|
||||
pattern,
|
||||
|
@ -1463,6 +1494,10 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
status_change_threshold: 4,
|
||||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
const { body: createdAction } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/actions/connector`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
|
@ -1488,7 +1523,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
.send(
|
||||
getTestRuleData({
|
||||
rule_type_id: 'test.patternFiring',
|
||||
schedule: { interval: '1s' },
|
||||
schedule: { interval: '2s' },
|
||||
throttle: null,
|
||||
notify_when: null,
|
||||
params: {
|
||||
|
@ -1501,8 +1536,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
params: {},
|
||||
frequency: {
|
||||
summary: false,
|
||||
throttle: '1s',
|
||||
notify_when: RuleNotifyWhen.THROTTLE,
|
||||
notify_when: RuleNotifyWhen.ACTIVE,
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -1511,8 +1545,7 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
params: {},
|
||||
frequency: {
|
||||
summary: false,
|
||||
throttle: '1s',
|
||||
notify_when: RuleNotifyWhen.THROTTLE,
|
||||
notify_when: RuleNotifyWhen.ACTIVE,
|
||||
},
|
||||
},
|
||||
],
|
||||
|
@ -1567,6 +1600,9 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
// flap and then recover, then active again
|
||||
const instance = [true, false, true, false, true].concat(
|
||||
...new Array(6).fill(false),
|
||||
|
@ -1709,8 +1745,8 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
.send(
|
||||
getTestRuleData({
|
||||
rule_type_id: 'test.patternFiring',
|
||||
schedule: { interval: '1s' },
|
||||
throttle: null,
|
||||
schedule: { interval: '2s' },
|
||||
throttle: '1s',
|
||||
params: {
|
||||
pattern,
|
||||
},
|
||||
|
@ -1942,8 +1978,8 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
|
|||
provider: 'alerting',
|
||||
actions: new Map([
|
||||
// make sure the counts of the # of events per type are as expected
|
||||
['execute-start', { equal: 6 }],
|
||||
['execute', { equal: 6 }],
|
||||
['execute-start', { gte: 6 }],
|
||||
['execute', { gte: 6 }],
|
||||
['new-instance', { equal: 1 }],
|
||||
['active-instance', { equal: 2 }],
|
||||
['recovered-instance', { equal: 1 }],
|
||||
|
|
|
@ -35,9 +35,7 @@ export default function createAlertsAsDataFlappingTest({ getService }: FtrProvid
|
|||
|
||||
const alertsAsDataIndex = '.alerts-test.patternfiring.alerts-default';
|
||||
|
||||
// FLAKY: https://github.com/elastic/kibana/issues/195573
|
||||
// Failing: See https://github.com/elastic/kibana/issues/195573
|
||||
describe.skip('alerts as data flapping', function () {
|
||||
describe('alerts as data flapping', function () {
|
||||
this.tags('skipFIPS');
|
||||
beforeEach(async () => {
|
||||
await es.deleteByQuery({
|
||||
|
@ -712,6 +710,9 @@ export default function createAlertsAsDataFlappingTest({ getService }: FtrProvid
|
|||
})
|
||||
.expect(200);
|
||||
|
||||
// wait so cache expires
|
||||
await setTimeoutAsync(TEST_CACHE_EXPIRATION_TIME);
|
||||
|
||||
// Wait for the rule to run once
|
||||
let run = 1;
|
||||
let runWhichItFlapped = 0;
|
||||
|
@ -754,6 +755,11 @@ export default function createAlertsAsDataFlappingTest({ getService }: FtrProvid
|
|||
const searchResult = await es.search({
|
||||
index: alertsAsDataIndex,
|
||||
body: {
|
||||
sort: [
|
||||
{
|
||||
'@timestamp': 'desc',
|
||||
},
|
||||
],
|
||||
query: {
|
||||
bool: {
|
||||
must: {
|
||||
|
|
|
@ -110,11 +110,13 @@ export default function ruleTests({ getService }: FtrProviderContext) {
|
|||
});
|
||||
});
|
||||
|
||||
const { status, body: rule } = await supertest.get(
|
||||
`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule/${ruleId}`
|
||||
);
|
||||
expect(status).to.eql(200);
|
||||
expect(rule.execution_status.status).to.eql('active');
|
||||
await retry.try(async () => {
|
||||
const { status, body: rule } = await supertest.get(
|
||||
`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule/${ruleId}`
|
||||
);
|
||||
expect(status).to.eql(200);
|
||||
expect(rule.execution_status.status).to.eql('active');
|
||||
});
|
||||
});
|
||||
|
||||
it('still logs alert docs when rule exceeds timeout when cancelAlertsOnRuleTimeout is false on rule type', async () => {
|
||||
|
|
|
@ -92,7 +92,11 @@ export default function createNotifyWhenTests({ getService }: FtrProviderContext
|
|||
});
|
||||
});
|
||||
|
||||
const executeActionEvents = getEventsByAction(events, 'execute-action');
|
||||
// Slice in case the rule ran more times than we are asserting on
|
||||
const executeActionEvents = getEventsByAction(events, 'execute-action').slice(
|
||||
0,
|
||||
expectedActionGroupBasedOnPattern.length
|
||||
);
|
||||
const executeActionEventsActionGroup = executeActionEvents.map(
|
||||
(event) => event?.kibana?.alerting?.action_group_id
|
||||
);
|
||||
|
|
|
@ -49,10 +49,18 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
describe('task manager metrics', () => {
|
||||
describe('task claim', () => {
|
||||
it('should increment task claim success/total counters', async () => {
|
||||
// counters are reset every 30 seconds, so wait until the start of a
|
||||
// fresh counter cycle to make sure values are incrementing
|
||||
// reset metrics counter
|
||||
await getMetrics(true);
|
||||
const metricsResetTime = Date.now();
|
||||
// we've resetted the metrics and have 30 seconds before they reset again
|
||||
// wait for the first set of metrics to be returned after the reset
|
||||
const initialMetrics = (
|
||||
await getMetrics(false, (metrics) => metrics?.metrics?.task_claim?.value.total === 1)
|
||||
await getMetrics(
|
||||
false,
|
||||
(metrics) =>
|
||||
!!metrics?.metrics?.task_claim?.timestamp &&
|
||||
new Date(metrics?.metrics?.task_claim?.timestamp).getTime() > metricsResetTime
|
||||
)
|
||||
).metrics;
|
||||
expect(initialMetrics).not.to.be(null);
|
||||
expect(initialMetrics?.task_claim).not.to.be(null);
|
||||
|
@ -92,7 +100,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
const initialMetrics = (
|
||||
await getMetrics(
|
||||
false,
|
||||
(metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue
|
||||
(metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= initialCounterValue
|
||||
)
|
||||
).metrics;
|
||||
expect(initialMetrics).not.to.be(null);
|
||||
|
@ -101,7 +109,10 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
|
||||
// retry until counter value resets
|
||||
const resetMetrics = (
|
||||
await getMetrics(false, (m: NodeMetrics) => m?.metrics?.task_claim?.value.total === 1)
|
||||
await getMetrics(
|
||||
false,
|
||||
(m: NodeMetrics) => (m?.metrics?.task_claim?.value.total || 0) >= 1
|
||||
)
|
||||
).metrics;
|
||||
expect(resetMetrics).not.to.be(null);
|
||||
expect(resetMetrics?.task_claim).not.to.be(null);
|
||||
|
@ -113,7 +124,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
const initialMetrics = (
|
||||
await getMetrics(
|
||||
false,
|
||||
(metrics) => metrics?.metrics?.task_claim?.value.total === initialCounterValue
|
||||
(metrics) => (metrics?.metrics?.task_claim?.value.total || 0) >= initialCounterValue
|
||||
)
|
||||
).metrics;
|
||||
expect(initialMetrics).not.to.be(null);
|
||||
|
@ -133,8 +144,8 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
expect(metrics?.task_claim).not.to.be(null);
|
||||
expect(metrics?.task_claim?.value).not.to.be(null);
|
||||
|
||||
expect(metrics?.task_claim?.value.success).to.equal(1);
|
||||
expect(metrics?.task_claim?.value.total).to.equal(1);
|
||||
expect(metrics?.task_claim?.value.success).to.be.greaterThan(0);
|
||||
expect(metrics?.task_claim?.value.total).to.be.greaterThan(0);
|
||||
|
||||
previousTaskClaimTimestamp = metrics?.task_claim?.timestamp!;
|
||||
|
||||
|
|
|
@ -797,7 +797,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
await retry.try(async () => {
|
||||
const [scheduledTask] = (await currentTasks()).docs;
|
||||
expect(scheduledTask.id).to.eql(task.id);
|
||||
expect(scheduledTask.status).to.eql('claiming');
|
||||
expect(['claiming', 'running'].includes(scheduledTask.status)).to.be(true);
|
||||
expect(scheduledTask.attempts).to.be.greaterThan(3);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -77,12 +77,16 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
}
|
||||
|
||||
it('should successfully schedule registered tasks, not claim unregistered tasks and mark removed task types as unrecognized', async () => {
|
||||
const testStart = new Date();
|
||||
const scheduledTask = await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
schedule: { interval: `1s` },
|
||||
params: {},
|
||||
});
|
||||
|
||||
let scheduledTaskRuns = 0;
|
||||
let scheduledTaskInstanceRunAt = scheduledTask.runAt;
|
||||
|
||||
await retry.try(async () => {
|
||||
const tasks = (await currentTasks()).docs;
|
||||
expect(tasks.length).to.eql(3);
|
||||
|
@ -98,8 +102,16 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
);
|
||||
const removedTaskInstance = tasks.find((task) => task.id === REMOVED_TASK_TYPE_ID);
|
||||
|
||||
expect(scheduledTaskInstance?.status).to.eql('claiming');
|
||||
if (scheduledTaskInstance && scheduledTaskInstance.runAt !== scheduledTaskInstanceRunAt) {
|
||||
scheduledTaskRuns++;
|
||||
scheduledTaskInstanceRunAt = scheduledTaskInstance.runAt;
|
||||
}
|
||||
|
||||
expect(scheduledTaskRuns).to.be.greaterThan(2);
|
||||
expect(unregisteredTaskInstance?.status).to.eql('idle');
|
||||
expect(new Date(unregisteredTaskInstance?.runAt || testStart).getTime()).to.be.lessThan(
|
||||
testStart.getTime()
|
||||
);
|
||||
expect(removedTaskInstance?.status).to.eql('unrecognized');
|
||||
});
|
||||
});
|
||||
|
|
|
@ -371,6 +371,7 @@ export function initRoutes(
|
|||
do {
|
||||
const { docs: tasks } = await taskManager.fetch({
|
||||
query: taskManagerQuery,
|
||||
size: 1000,
|
||||
});
|
||||
tasksFound = tasks.length;
|
||||
await Promise.all(tasks.map((task) => taskManager.remove(task.id)));
|
||||
|
|
|
@ -6,9 +6,7 @@
|
|||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import url from 'url';
|
||||
import { keyBy, mapValues } from 'lodash';
|
||||
import supertest from 'supertest';
|
||||
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
|
||||
|
@ -82,14 +80,13 @@ interface MonitoringStats {
|
|||
}
|
||||
|
||||
export default function ({ getService }: FtrProviderContext) {
|
||||
const config = getService('config');
|
||||
const retry = getService('retry');
|
||||
const request = supertest(url.format(config.get('servers.kibana')));
|
||||
const supertest = getService('supertest');
|
||||
|
||||
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
function getHealthRequest() {
|
||||
return request.get('/api/task_manager/_health').set('kbn-xsrf', 'foo');
|
||||
return supertest.get('/api/task_manager/_health').set('kbn-xsrf', 'foo');
|
||||
}
|
||||
|
||||
function getHealth(): Promise<MonitoringStats> {
|
||||
|
@ -114,7 +111,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
}
|
||||
|
||||
function scheduleTask(task: Partial<ConcreteTaskInstance>): Promise<ConcreteTaskInstance> {
|
||||
return request
|
||||
return supertest
|
||||
.post('/api/sample_tasks/schedule')
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.send({ task })
|
||||
|
@ -125,6 +122,11 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
const monitoredAggregatedStatsRefreshRate = 5000;
|
||||
|
||||
describe('health', () => {
|
||||
afterEach(async () => {
|
||||
// clean up after each test
|
||||
return await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
|
||||
});
|
||||
|
||||
it('should return basic configuration of task manager', async () => {
|
||||
const health = await getHealth();
|
||||
expect(health.status).to.eql('OK');
|
||||
|
|
|
@ -469,8 +469,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
});
|
||||
});
|
||||
|
||||
// always failing
|
||||
it.skip('should only run as many instances of a task as its maxConcurrency will allow', async () => {
|
||||
it('should only run as many instances of a task as its maxConcurrency will allow', async () => {
|
||||
// should run as there's only one and maxConcurrency on this TaskType is 1
|
||||
const firstWithSingleConcurrency = await scheduleTask({
|
||||
taskType: 'sampleTaskWithSingleConcurrency',
|
||||
|
@ -762,18 +761,24 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
});
|
||||
});
|
||||
|
||||
// flaky
|
||||
it.skip('should continue claiming recurring task even if maxAttempts has been reached', async () => {
|
||||
it('should continue claiming recurring task even if maxAttempts has been reached', async () => {
|
||||
const task = await scheduleTask({
|
||||
taskType: 'sampleRecurringTaskTimingOut',
|
||||
schedule: { interval: '1s' },
|
||||
params: {},
|
||||
});
|
||||
|
||||
let taskRuns = 0;
|
||||
let taskRetryAt = task.retryAt;
|
||||
|
||||
await retry.try(async () => {
|
||||
const [scheduledTask] = (await currentTasks()).docs;
|
||||
expect(scheduledTask.id).to.eql(task.id);
|
||||
expect(scheduledTask.status).to.eql('claiming');
|
||||
if (scheduledTask.retryAt !== taskRetryAt) {
|
||||
taskRuns++;
|
||||
taskRetryAt = scheduledTask.retryAt;
|
||||
}
|
||||
expect(taskRuns).to.be.greaterThan(3);
|
||||
expect(scheduledTask.attempts).to.be.greaterThan(3);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -56,6 +56,11 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
await esArchiver.unload('x-pack/test/functional/es_archives/task_manager_removed_types');
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
// clean up after each test
|
||||
return await request.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
|
||||
});
|
||||
|
||||
function scheduleTask(
|
||||
task: Partial<ConcreteTaskInstance | DeprecatedConcreteTaskInstance>
|
||||
): Promise<SerializedConcreteTaskInstance> {
|
||||
|
@ -76,14 +81,17 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
.then((response) => response.body);
|
||||
}
|
||||
|
||||
// flaky
|
||||
it.skip('should successfully schedule registered tasks, not claim unregistered tasks and mark removed task types as unrecognized', async () => {
|
||||
it('should successfully schedule registered tasks, not claim unregistered tasks and mark removed task types as unrecognized', async () => {
|
||||
const testStart = new Date();
|
||||
const scheduledTask = await scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
schedule: { interval: `1s` },
|
||||
params: {},
|
||||
});
|
||||
|
||||
let scheduledTaskRuns = 0;
|
||||
let scheduledTaskInstanceRunAt = scheduledTask.runAt;
|
||||
|
||||
await retry.try(async () => {
|
||||
const tasks = (await currentTasks()).docs;
|
||||
expect(tasks.length).to.eql(3);
|
||||
|
@ -99,8 +107,16 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
);
|
||||
const removedTaskInstance = tasks.find((task) => task.id === REMOVED_TASK_TYPE_ID);
|
||||
|
||||
expect(scheduledTaskInstance?.status).to.eql('claiming');
|
||||
if (scheduledTaskInstance && scheduledTaskInstance.runAt !== scheduledTaskInstanceRunAt) {
|
||||
scheduledTaskRuns++;
|
||||
scheduledTaskInstanceRunAt = scheduledTaskInstance.runAt;
|
||||
}
|
||||
|
||||
expect(scheduledTaskRuns).to.be.greaterThan(2);
|
||||
expect(unregisteredTaskInstance?.status).to.eql('idle');
|
||||
expect(new Date(unregisteredTaskInstance?.runAt || testStart).getTime()).to.be.lessThan(
|
||||
testStart.getTime()
|
||||
);
|
||||
expect(removedTaskInstance?.status).to.eql('unrecognized');
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue