mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 01:38:56 -04:00
[cluster/worker] use binders to manage event handlers
This commit is contained in:
parent
cc65d8958c
commit
5265f0605f
1 changed files with 35 additions and 24 deletions
|
@ -1,9 +1,9 @@
|
|||
import _ from 'lodash';
|
||||
import cluster from 'cluster';
|
||||
let { resolve } = require('path');
|
||||
let { EventEmitter } = require('events');
|
||||
import { resolve } from 'path';
|
||||
import { EventEmitter } from 'events';
|
||||
|
||||
import fromRoot from '../../utils/fromRoot';
|
||||
import { BinderFor, fromRoot } from '../../utils';
|
||||
|
||||
let cliPath = fromRoot('src/cli');
|
||||
let baseArgs = _.difference(process.argv.slice(2), ['--no-watch']);
|
||||
|
@ -18,13 +18,6 @@ let dead = fork => {
|
|||
return fork.isDead() || fork.killed;
|
||||
};
|
||||
|
||||
let kill = fork => {
|
||||
// fork.kill() waits for process to disconnect, but causes occasional
|
||||
// "ipc disconnected" errors and is too slow for the proc's "exit" event
|
||||
fork.process.kill();
|
||||
fork.killed = true;
|
||||
};
|
||||
|
||||
module.exports = class Worker extends EventEmitter {
|
||||
constructor(opts) {
|
||||
opts = opts || {};
|
||||
|
@ -39,23 +32,29 @@ module.exports = class Worker extends EventEmitter {
|
|||
this.listening = false;
|
||||
this.changes = [];
|
||||
|
||||
this.forkBinder = null; // defined when the fork is
|
||||
this.clusterBinder = new BinderFor(cluster);
|
||||
this.processBinder = new BinderFor(process);
|
||||
|
||||
let argv = _.union(baseArgv, opts.argv || []);
|
||||
this.env = {
|
||||
kbnWorkerType: this.type,
|
||||
kbnWorkerArgv: JSON.stringify(argv)
|
||||
};
|
||||
|
||||
_.bindAll(this, ['onExit', 'onMessage', 'onOnline', 'onDisconnect', 'shutdown', 'start']);
|
||||
}
|
||||
|
||||
onExit(fork, code) {
|
||||
if (this.fork !== fork) return;
|
||||
|
||||
// we have our fork's exit, so stop listening for others
|
||||
this.clusterBinder.destroy();
|
||||
|
||||
// our fork is gone, clear our ref so we don't try to talk to it anymore
|
||||
this.fork = null;
|
||||
this.forkBinder = null;
|
||||
|
||||
this.online = false;
|
||||
this.listening = false;
|
||||
cluster.removeListener('exit', this.onExit);
|
||||
this.emit('fork:exit');
|
||||
|
||||
if (code) {
|
||||
|
@ -75,12 +74,17 @@ module.exports = class Worker extends EventEmitter {
|
|||
|
||||
async shutdown() {
|
||||
if (this.fork && !dead(this.fork)) {
|
||||
kill(this.fork);
|
||||
this.fork.removeListener('message', this.parseIncomingMessage);
|
||||
this.fork.removeListener('online', this.onOnline);
|
||||
this.fork.removeListener('disconnect', this.onDisconnect);
|
||||
process.removeListener('exit', this.shutdown);
|
||||
// kill the fork
|
||||
this.fork.process.kill();
|
||||
this.fork.killed = true;
|
||||
|
||||
// stop listening to the fork, it's just going to die
|
||||
this.forkBinder.destroy();
|
||||
|
||||
// we don't need to react to process.exit anymore
|
||||
this.processBinder.destroy();
|
||||
|
||||
// wait until the cluster reports this fork has exitted, then resolve
|
||||
await new Promise(cb => this.once('fork:exit', cb));
|
||||
}
|
||||
}
|
||||
|
@ -121,8 +125,8 @@ module.exports = class Worker extends EventEmitter {
|
|||
}
|
||||
|
||||
async start() {
|
||||
// once "exit" event is received with 0 status, start() is called again
|
||||
if (this.fork) {
|
||||
// once "exit" event is received with 0 status, start() is called again
|
||||
this.shutdown();
|
||||
await new Promise(cb => this.once('online', cb));
|
||||
return;
|
||||
|
@ -136,13 +140,20 @@ module.exports = class Worker extends EventEmitter {
|
|||
}
|
||||
|
||||
this.fork = cluster.fork(this.env);
|
||||
this.fork.on('message', this.parseIncomingMessage);
|
||||
this.fork.on('online', this.onOnline);
|
||||
this.fork.on('disconnect', this.onDisconnect);
|
||||
this.forkBinder = new BinderFor(this.fork);
|
||||
|
||||
process.on('exit', this.shutdown);
|
||||
cluster.on('exit', this.onExit);
|
||||
// when the fork sends a message, comes online, or looses it's connection, then react
|
||||
this.forkBinder.on('message', (msg) => this.parseIncomingMessage(msg));
|
||||
this.forkBinder.on('online', () => this.onOnline());
|
||||
this.forkBinder.on('disconnect', () => this.onDisconnect());
|
||||
|
||||
// when the cluster says a fork has exitted, check if it is ours
|
||||
this.clusterBinder.on('exit', (fork, code) => this.onExit(fork, code));
|
||||
|
||||
// when the process exits, make sure we kill our workers
|
||||
this.processBinder.on('exit', () => this.shutdown());
|
||||
|
||||
// wait for the fork to report it is online before resolving
|
||||
await new Promise(cb => this.once('fork:online', cb));
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue