[Code] Use seq_no and primary_term to do ES concurrency control for EsQueue and reenable code tests (#41377) (#41399)

* Revert "disable all code tests (#41363)"

This reverts commit 771d70c334.

* Revert "[code] disable failing tests (#41336) (#41338) (#41337)"

This reverts commit 62fedccad5.

* [Code] Use seq_no and primary_term to do ES concurrency control for queue
This commit is contained in:
Mengwei Ding 2019-07-17 14:00:28 -07:00 committed by GitHub
parent 4645529eec
commit 504963284b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -17,7 +17,6 @@ const puid = new Puid();
function formatJobObject(job) {
return {
index: job._index,
type: job._type,
id: job._id,
// Expose the payload of the job even when the job failed/timeout
...job._source.payload,
@ -125,9 +124,9 @@ export class Worker extends events.EventEmitter {
return this.client.update({
index: job._index,
type: job._type,
id: job._id,
version: job._version,
if_seq_no: job._seq_no,
if_primary_term: job._primary_term,
body: { doc }
})
.then((response) => {
@ -168,9 +167,9 @@ export class Worker extends events.EventEmitter {
return this.client.update({
index: job._index,
type: job._type,
id: job._id,
version: job._version,
if_seq_no: job._seq_no,
if_primary_term: job._primary_term,
body: { doc }
})
.then(() => true)
@ -193,7 +192,6 @@ export class Worker extends events.EventEmitter {
return this.client.update({
index: job._index,
type: job._type,
id: job._id,
version: job._version,
body: { doc }
@ -276,9 +274,9 @@ export class Worker extends events.EventEmitter {
return this.client.update({
index: job._index,
type: job._type,
id: job._id,
version: job._version,
if_seq_no: job._seq_no,
if_primary_term: job._primary_term,
body: { doc }
})
.then(() => {
@ -425,6 +423,7 @@ export class Worker extends events.EventEmitter {
_getPendingJobs() {
const nowTime = moment().toISOString();
const query = {
seq_no_primary_term: true,
_source: {
excludes: [ 'output.content' ]
},