Merge pull request #5867 from spalger/implement/clusterWorkerListening

Track when a cluster worker is listening
This commit is contained in:
Spencer 2016-03-29 19:50:48 -07:00
commit 43c52c7484
19 changed files with 404 additions and 44 deletions

View file

@ -0,0 +1,44 @@
import EventEmitter from 'events';
import { assign, random } from 'lodash';
import sinon from 'sinon';
import cluster from 'cluster';
import { delay } from 'bluebird';
export default class MockClusterFork extends EventEmitter {
constructor() {
super();
let dead = true;
function wait() {
return delay(random(10, 250));
}
assign(this, {
process: {
kill: sinon.spy(() => {
(async () => {
await wait();
this.emit('disconnect');
await wait();
dead = true;
this.emit('exit');
cluster.emit('exit', this, this.exitCode || 0);
}());
}),
},
isDead: sinon.spy(() => dead),
send: sinon.stub()
});
sinon.spy(this, 'on');
sinon.spy(this, 'removeListener');
sinon.spy(this, 'emit');
(async () => {
await wait();
dead = false;
this.emit('online');
}());
}
}

View file

@ -0,0 +1,59 @@
import expect from 'expect.js';
import sinon from 'auto-release-sinon';
import cluster from 'cluster';
import { ChildProcess } from 'child_process';
import { sample, difference } from 'lodash';
import ClusterManager from '../cluster_manager';
import Worker from '../worker';
describe('CLI cluster manager', function () {
function setup() {
sinon.stub(cluster, 'fork', function () {
return {
process: {
kill: sinon.stub(),
},
isDead: sinon.stub().returns(false),
removeListener: sinon.stub(),
on: sinon.stub(),
send: sinon.stub()
};
});
const manager = new ClusterManager({});
return manager;
}
it('has two workers', function () {
const manager = setup();
expect(manager.workers).to.have.length(2);
for (const worker of manager.workers) expect(worker).to.be.a(Worker);
expect(manager.optimizer).to.be.a(Worker);
expect(manager.server).to.be.a(Worker);
});
it('delivers broadcast messages to other workers', function () {
const manager = setup();
for (const worker of manager.workers) {
Worker.prototype.start.call(worker);// bypass the debounced start method
worker.onOnline();
}
const football = {};
const messenger = sample(manager.workers);
messenger.emit('broadcast', football);
for (const worker of manager.workers) {
if (worker === messenger) {
expect(worker.fork.send.callCount).to.be(0);
} else {
expect(worker.fork.send.firstCall.args[0]).to.be(football);
}
}
});
});

View file

@ -0,0 +1,198 @@
import expect from 'expect.js';
import sinon from 'auto-release-sinon';
import cluster from 'cluster';
import { ChildProcess } from 'child_process';
import { difference, findIndex, sample } from 'lodash';
import { fromNode as fn } from 'bluebird';
import MockClusterFork from './_mock_cluster_fork';
import Worker from '../worker';
const workersToShutdown = [];
function assertListenerAdded(emitter, event) {
sinon.assert.calledWith(emitter.on, event);
}
function assertListenerRemoved(emitter, event) {
sinon.assert.calledWith(
emitter.removeListener,
event,
emitter.on.args[findIndex(emitter.on.args, { 0: event })][1]
);
}
function setup(opts = {}) {
sinon.stub(cluster, 'fork', function () {
return new MockClusterFork();
});
const worker = new Worker(opts);
workersToShutdown.push(worker);
return worker;
}
describe('CLI cluster manager', function () {
afterEach(async function () {
for (const worker of workersToShutdown) {
if (worker.shutdown.restore) {
// if the shutdown method was stubbed, restore it first
worker.shutdown.restore();
}
await worker.shutdown();
}
});
describe('#onChange', function () {
context('opts.watch = true', function () {
it('restarts the fork', function () {
const worker = setup({ watch: true });
sinon.stub(worker, 'start');
worker.onChange('/some/path');
expect(worker.changes).to.eql(['/some/path']);
sinon.assert.calledOnce(worker.start);
});
});
context('opts.watch = false', function () {
it('does not restart the fork', function () {
const worker = setup({ watch: false });
sinon.stub(worker, 'start');
worker.onChange('/some/path');
expect(worker.changes).to.eql([]);
sinon.assert.notCalled(worker.start);
});
});
});
describe('#shutdown', function () {
context('after starting()', function () {
it('kills the worker and unbinds from message, online, and disconnect events', async function () {
const worker = setup();
await worker.start();
expect(worker).to.have.property('online', true);
const fork = worker.fork;
sinon.assert.notCalled(fork.process.kill);
assertListenerAdded(fork, 'message');
assertListenerAdded(fork, 'online');
assertListenerAdded(fork, 'disconnect');
worker.shutdown();
sinon.assert.calledOnce(fork.process.kill);
assertListenerRemoved(fork, 'message');
assertListenerRemoved(fork, 'online');
assertListenerRemoved(fork, 'disconnect');
});
});
context('before being started', function () {
it('does nothing', function () {
const worker = setup();
worker.shutdown();
});
});
});
describe('#parseIncomingMessage()', function () {
context('on a started worker', function () {
it(`is bound to fork's message event`, async function () {
const worker = setup();
await worker.start();
sinon.assert.calledWith(worker.fork.on, 'message');
});
});
it('ignores non-array messsages', function () {
const worker = setup();
worker.parseIncomingMessage('some string thing');
worker.parseIncomingMessage(0);
worker.parseIncomingMessage(null);
worker.parseIncomingMessage(undefined);
worker.parseIncomingMessage({ like: 'an object' });
worker.parseIncomingMessage(/weird/);
});
it('calls #onMessage with message parts', function () {
const worker = setup();
const stub = sinon.stub(worker, 'onMessage');
worker.parseIncomingMessage([10, 100, 1000, 10000]);
sinon.assert.calledWith(stub, 10, 100, 1000, 10000);
});
});
describe('#onMessage', function () {
context('when sent WORKER_BROADCAST message', function () {
it('emits the data to be broadcasted', function () {
const worker = setup();
const data = {};
const stub = sinon.stub(worker, 'emit');
worker.onMessage('WORKER_BROADCAST', data);
sinon.assert.calledWithExactly(stub, 'broadcast', data);
});
});
context('when sent WORKER_LISTENING message', function () {
it('sets the listening flag and emits the listening event', function () {
const worker = setup();
const data = {};
const stub = sinon.stub(worker, 'emit');
expect(worker).to.have.property('listening', false);
worker.onMessage('WORKER_LISTENING');
expect(worker).to.have.property('listening', true);
sinon.assert.calledWithExactly(stub, 'listening');
});
});
context('when passed an unkown message', function () {
it('does nothing', function () {
const worker = setup();
worker.onMessage('asdlfkajsdfahsdfiohuasdofihsdoif');
worker.onMessage({});
worker.onMessage(23049283094);
});
});
});
describe('#start', function () {
context('when not started', function () {
it('creates a fork and waits for it to come online', async function () {
const worker = setup();
sinon.spy(worker, 'on');
await worker.start();
sinon.assert.calledOnce(cluster.fork);
sinon.assert.calledWith(worker.on, 'fork:online');
});
it('listens for cluster and process "exit" events', async function () {
const worker = setup();
sinon.spy(process, 'on');
sinon.spy(cluster, 'on');
await worker.start();
sinon.assert.calledOnce(cluster.on);
sinon.assert.calledWith(cluster.on, 'exit');
sinon.assert.calledOnce(process.on);
sinon.assert.calledWith(process.on, 'exit');
});
});
context('when already started', function () {
it('calls shutdown and waits for the graceful shutdown to cause a restart', async function () {
const worker = setup();
await worker.start();
sinon.spy(worker, 'shutdown');
sinon.spy(worker, 'on');
worker.start();
sinon.assert.calledOnce(worker.shutdown);
sinon.assert.calledWith(worker.on, 'online');
});
});
});
});

View file

@ -11,7 +11,7 @@ import BasePathProxy from './base_path_proxy';
process.env.kbnWorkerType = 'managr';
module.exports = class ClusterManager {
constructor(opts, settings) {
constructor(opts = {}, settings = {}) {
this.log = new Log(opts.quiet, opts.silent);
this.addedCount = 0;

View file

@ -1,9 +1,9 @@
import _ from 'lodash';
import cluster from 'cluster';
let { resolve } = require('path');
let { EventEmitter } = require('events');
import { resolve } from 'path';
import { EventEmitter } from 'events';
import fromRoot from '../../utils/from_root';
import { BinderFor, fromRoot } from '../../utils';
let cliPath = fromRoot('src/cli');
let baseArgs = _.difference(process.argv.slice(2), ['--no-watch']);
@ -18,13 +18,6 @@ let dead = fork => {
return fork.isDead() || fork.killed;
};
let kill = fork => {
// fork.kill() waits for process to disconnect, but causes occasional
// "ipc disconnected" errors and is too slow for the proc's "exit" event
fork.process.kill();
fork.killed = true;
};
module.exports = class Worker extends EventEmitter {
constructor(opts) {
opts = opts || {};
@ -36,26 +29,33 @@ module.exports = class Worker extends EventEmitter {
this.watch = (opts.watch !== false);
this.startCount = 0;
this.online = false;
this.listening = false;
this.changes = [];
this.forkBinder = null; // defined when the fork is
this.clusterBinder = new BinderFor(cluster);
this.processBinder = new BinderFor(process);
let argv = _.union(baseArgv, opts.argv || []);
this.env = {
kbnWorkerType: this.type,
kbnWorkerArgv: JSON.stringify(argv)
};
_.bindAll(this, ['onExit', 'onMessage', 'onOnline', 'onDisconnect', 'shutdown', 'start']);
this.start = _.debounce(this.start, 25);
cluster.on('exit', this.onExit);
process.on('exit', this.shutdown);
}
onExit(fork, code) {
if (this.fork !== fork) return;
// we have our fork's exit, so stop listening for others
this.clusterBinder.destroy();
// our fork is gone, clear our ref so we don't try to talk to it anymore
this.fork = null;
this.forkBinder = null;
this.online = false;
this.listening = false;
this.emit('fork:exit');
if (code) {
this.log.bad(`${this.title} crashed`, 'with status code', code);
@ -72,26 +72,48 @@ module.exports = class Worker extends EventEmitter {
this.start();
}
shutdown() {
async shutdown() {
if (this.fork && !dead(this.fork)) {
kill(this.fork);
this.fork.removeListener('message', this.onMessage);
this.fork.removeListener('online', this.onOnline);
this.fork.removeListener('disconnect', this.onDisconnect);
// kill the fork
this.fork.process.kill();
this.fork.killed = true;
// stop listening to the fork, it's just going to die
this.forkBinder.destroy();
// we don't need to react to process.exit anymore
this.processBinder.destroy();
// wait until the cluster reports this fork has exitted, then resolve
await new Promise(cb => this.once('fork:exit', cb));
}
}
onMessage(msg) {
if (!_.isArray(msg) || msg[0] !== 'WORKER_BROADCAST') return;
this.emit('broadcast', msg[1]);
parseIncomingMessage(msg) {
if (!_.isArray(msg)) return;
this.onMessage(...msg);
}
onMessage(type, data) {
switch (type) {
case 'WORKER_BROADCAST':
this.emit('broadcast', data);
break;
case 'WORKER_LISTENING':
this.listening = true;
this.emit('listening');
break;
}
}
onOnline() {
this.online = true;
this.emit('fork:online');
}
onDisconnect() {
this.online = false;
this.listening = false;
}
flushChangeBuffer() {
@ -102,9 +124,13 @@ module.exports = class Worker extends EventEmitter {
}, '');
}
start() {
// once "exit" event is received with 0 status, start() is called again
if (this.fork) return this.shutdown();
async start() {
if (this.fork) {
// once "exit" event is received with 0 status, start() is called again
this.shutdown();
await new Promise(cb => this.once('online', cb));
return;
}
if (this.changes.length) {
this.log.warn(`restarting ${this.title}`, `due to changes in ${this.flushChangeBuffer()}`);
@ -114,8 +140,20 @@ module.exports = class Worker extends EventEmitter {
}
this.fork = cluster.fork(this.env);
this.fork.on('message', this.onMessage);
this.fork.on('online', this.onOnline);
this.fork.on('disconnect', this.onDisconnect);
this.forkBinder = new BinderFor(this.fork);
// when the fork sends a message, comes online, or looses it's connection, then react
this.forkBinder.on('message', (msg) => this.parseIncomingMessage(msg));
this.forkBinder.on('online', () => this.onOnline());
this.forkBinder.on('disconnect', () => this.onDisconnect());
// when the cluster says a fork has exitted, check if it is ours
this.clusterBinder.on('exit', (fork, code) => this.onExit(fork, code));
// when the process exits, make sure we kill our workers
this.processBinder.on('exit', () => this.shutdown());
// wait for the fork to report it is online before resolving
await new Promise(cb => this.once('fork:online', cb));
}
};

View file

@ -2,7 +2,7 @@ import _ from 'lodash';
import fs from 'fs';
import yaml from 'js-yaml';
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
let legacySettingMap = {
// server

View file

@ -3,7 +3,7 @@ const { isWorker } = require('cluster');
const { resolve } = require('path');
const cwd = process.cwd();
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
let canCluster;
try {

View file

@ -1,6 +1,6 @@
import path from 'path';
import expect from 'expect.js';
import fromRoot from '../../../utils/from_root';
import { fromRoot } from '../../../utils';
import { resolve } from 'path';
import { parseMilliseconds, parse, getPlatform } from '../settings';

View file

@ -1,4 +1,4 @@
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
import install from './install';
import Logger from '../lib/logger';
import pkg from '../../utils/package_json';

View file

@ -1,5 +1,5 @@
import _ from 'lodash';
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
import KbnServer from '../../server/kbn_server';
import readYamlConfig from '../../cli/serve/read_yaml_config';
import { versionSatisfies, cleanVersion } from './version';

View file

@ -1,4 +1,4 @@
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
import list from './list';
import Logger from '../lib/logger';
import { parse } from './settings';

View file

@ -1,4 +1,4 @@
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
import remove from './remove';
import Logger from '../lib/logger';
import { parse } from './settings';

View file

@ -1,6 +1,6 @@
import LazyServer from './lazy_server';
import LazyOptimizer from './lazy_optimizer';
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
export default async (kbnServer, kibanaHapiServer, config) => {
let server = new LazyServer(

View file

@ -1,5 +1,5 @@
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
import { chain, memoize } from 'lodash';
import { resolve } from 'path';
import { map, fromNode } from 'bluebird';

View file

@ -1,7 +1,7 @@
import { union } from 'lodash';
import findSourceFiles from './find_source_files';
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
export default (kibana) => {
return new kibana.Plugin({

View file

@ -5,7 +5,7 @@ import { get } from 'lodash';
import { randomBytes } from 'crypto';
import os from 'os';
import fromRoot from '../../utils/from_root';
import { fromRoot } from '../../utils';
module.exports = () => Joi.object({
pkg: Joi.object({

View file

@ -1,8 +1,8 @@
import Hapi from 'hapi';
import { constant, once, compact, flatten } from 'lodash';
import { promisify, resolve, fromNode } from 'bluebird';
import fromRoot from '../utils/from_root';
import pkg from '../utils/package_json';
import { isWorker } from 'cluster';
import { fromRoot, pkg } from '../utils';
let rootDir = fromRoot('.');
@ -78,6 +78,11 @@ module.exports = class KbnServer {
await this.ready();
await fromNode(cb => server.start(cb));
if (isWorker) {
// help parent process know when we are ready
process.send(['WORKER_LISTENING']);
}
server.log(['listening', 'info'], `Server running at ${server.info.uri}`);
return server;
}

12
src/utils/binder_for.js Normal file
View file

@ -0,0 +1,12 @@
import { Binder } from './';
export default class BinderFor extends Binder {
constructor(emitter) {
super();
this.emitter = emitter;
}
on(...args) {
super.on(this.emitter, ...args);
}
}

4
src/utils/index.js Normal file
View file

@ -0,0 +1,4 @@
export Binder from './binder';
export BinderFor from './binder_for';
export fromRoot from './from_root';
export pkg from './package_json';