mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
* chore(NA): revert changes introduced on pr 67059 * chore(NA): fix eslint problems
This commit is contained in:
parent
fc4d1b840f
commit
1193af829a
7 changed files with 28 additions and 342 deletions
|
@ -21,7 +21,6 @@ export * from './bundle';
|
|||
export * from './bundle_cache';
|
||||
export * from './worker_config';
|
||||
export * from './worker_messages';
|
||||
export * from './parent_messages';
|
||||
export * from './compiler_messages';
|
||||
export * from './ts_helpers';
|
||||
export * from './rxjs_helpers';
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
export interface ParentPongMsg {
|
||||
type: 'pong';
|
||||
}
|
||||
|
||||
export const isParentPong = (value: any): value is ParentPongMsg =>
|
||||
typeof value === 'object' && value && value.type === 'pong';
|
||||
|
||||
export class ParentMsgs {
|
||||
pong(): ParentPongMsg {
|
||||
return {
|
||||
type: 'pong',
|
||||
};
|
||||
}
|
||||
}
|
|
@ -24,17 +24,13 @@ import {
|
|||
CompilerErrorMsg,
|
||||
} from './compiler_messages';
|
||||
|
||||
export type InternalWorkerMsg =
|
||||
| WorkerPingMsg
|
||||
export type WorkerMsg =
|
||||
| CompilerRunningMsg
|
||||
| CompilerIssueMsg
|
||||
| CompilerSuccessMsg
|
||||
| CompilerErrorMsg
|
||||
| WorkerErrorMsg;
|
||||
|
||||
// ping messages are internal, they don't apper in public message streams
|
||||
export type WorkerMsg = Exclude<InternalWorkerMsg, WorkerPingMsg>;
|
||||
|
||||
/**
|
||||
* Message sent when the worker encounters an error that it can't
|
||||
* recover from, no more messages will be sent and the worker
|
||||
|
@ -46,10 +42,6 @@ export interface WorkerErrorMsg {
|
|||
errorStack?: string;
|
||||
}
|
||||
|
||||
export interface WorkerPingMsg {
|
||||
type: 'ping';
|
||||
}
|
||||
|
||||
const WORKER_STATE_TYPES: ReadonlyArray<WorkerMsg['type']> = [
|
||||
'running',
|
||||
'compiler issue',
|
||||
|
@ -58,19 +50,10 @@ const WORKER_STATE_TYPES: ReadonlyArray<WorkerMsg['type']> = [
|
|||
'worker error',
|
||||
];
|
||||
|
||||
export const isWorkerPing = (value: any): value is WorkerPingMsg =>
|
||||
typeof value === 'object' && value && value.type === 'ping';
|
||||
|
||||
export const isWorkerMsg = (value: any): value is WorkerMsg =>
|
||||
typeof value === 'object' && value && WORKER_STATE_TYPES.includes(value.type);
|
||||
|
||||
export class WorkerMsgs {
|
||||
ping(): WorkerPingMsg {
|
||||
return {
|
||||
type: 'ping',
|
||||
};
|
||||
}
|
||||
|
||||
error(error: Error): WorkerErrorMsg {
|
||||
return {
|
||||
type: 'worker error',
|
||||
|
|
|
@ -22,14 +22,12 @@ import { Readable } from 'stream';
|
|||
import { inspect } from 'util';
|
||||
|
||||
import * as Rx from 'rxjs';
|
||||
import { map, filter, takeUntil } from 'rxjs/operators';
|
||||
import { map, takeUntil } from 'rxjs/operators';
|
||||
|
||||
import { isWorkerMsg, isWorkerPing, WorkerConfig, WorkerMsg, Bundle, ParentMsgs } from '../common';
|
||||
import { isWorkerMsg, WorkerConfig, WorkerMsg, Bundle } from '../common';
|
||||
|
||||
import { OptimizerConfig } from './optimizer_config';
|
||||
|
||||
const parentMsgs = new ParentMsgs();
|
||||
|
||||
export interface WorkerStdio {
|
||||
type: 'worker stdio';
|
||||
stream: 'stdout' | 'stderr';
|
||||
|
@ -148,16 +146,6 @@ export function observeWorker(
|
|||
observeStdio$(proc.stderr, 'stderr'),
|
||||
Rx.fromEvent<[unknown]>(proc, 'message')
|
||||
.pipe(
|
||||
// filter out ping messages so they don't end up in the general message stream
|
||||
filter(([msg]) => {
|
||||
if (!isWorkerPing(msg)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
proc.send(parentMsgs.pong());
|
||||
return false;
|
||||
}),
|
||||
|
||||
// validate the messages from the process
|
||||
map(([msg]) => {
|
||||
if (!isWorkerMsg(msg)) {
|
||||
|
|
|
@ -1,166 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import { inspect } from 'util';
|
||||
|
||||
import * as Rx from 'rxjs';
|
||||
import { tap, takeUntil } from 'rxjs/operators';
|
||||
|
||||
import { observeParentOffline, Process } from './observe_parent_offline';
|
||||
import { WorkerMsgs, ParentMsgs, isWorkerPing } from '../common';
|
||||
|
||||
jest.useFakeTimers();
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllTimers();
|
||||
});
|
||||
|
||||
const workerMsgs = new WorkerMsgs();
|
||||
const parentMsgs = new ParentMsgs();
|
||||
class MockProcess extends EventEmitter implements Process {
|
||||
connected?: boolean;
|
||||
send?: jest.Mock;
|
||||
|
||||
constructor(options: { connected?: boolean; send?: jest.Mock | false } = {}) {
|
||||
super();
|
||||
|
||||
this.connected = options.connected ?? true;
|
||||
this.send = options.send === false ? undefined : options.send ?? jest.fn();
|
||||
}
|
||||
}
|
||||
|
||||
async function record(observable: Rx.Observable<any>): Promise<string[]> {
|
||||
const notes: string[] = [];
|
||||
|
||||
await observable
|
||||
.pipe(
|
||||
tap({
|
||||
next(value) {
|
||||
notes.push(`next: ${inspect(value)}`);
|
||||
},
|
||||
error(error) {
|
||||
notes.push(`error: ${inspect(error)}`);
|
||||
},
|
||||
complete() {
|
||||
notes.push(`complete`);
|
||||
},
|
||||
})
|
||||
)
|
||||
.toPromise();
|
||||
|
||||
return notes;
|
||||
}
|
||||
|
||||
async function waitForTick() {
|
||||
await new Promise((resolve) => {
|
||||
process.nextTick(resolve);
|
||||
});
|
||||
}
|
||||
|
||||
describe('emits and completes when parent exists because:', () => {
|
||||
test('process.connected is false', async () => {
|
||||
const mockProc = new MockProcess({
|
||||
connected: false,
|
||||
});
|
||||
|
||||
const promise = record(observeParentOffline(mockProc, workerMsgs));
|
||||
jest.advanceTimersToNextTimer();
|
||||
expect(await promise).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"next: 'parent offline (disconnected)'",
|
||||
"complete",
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
test('process.send is falsey', async () => {
|
||||
const mockProc = new MockProcess({
|
||||
send: false,
|
||||
});
|
||||
|
||||
const promise = record(observeParentOffline(mockProc, workerMsgs));
|
||||
jest.advanceTimersToNextTimer();
|
||||
expect(await promise).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"next: 'parent offline (disconnected)'",
|
||||
"complete",
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
test('process.send throws "ERR_IPC_CHANNEL_CLOSED"', async () => {
|
||||
const mockProc = new MockProcess({
|
||||
send: jest.fn(() => {
|
||||
const error = new Error();
|
||||
(error as any).code = 'ERR_IPC_CHANNEL_CLOSED';
|
||||
throw error;
|
||||
}),
|
||||
});
|
||||
|
||||
const promise = record(observeParentOffline(mockProc, workerMsgs));
|
||||
jest.advanceTimersToNextTimer();
|
||||
expect(await promise).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"next: 'parent offline (ipc channel exception)'",
|
||||
"complete",
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
test('ping timeout', async () => {
|
||||
const mockProc = new MockProcess({});
|
||||
|
||||
const promise = record(observeParentOffline(mockProc, workerMsgs));
|
||||
jest.advanceTimersByTime(10000);
|
||||
expect(await promise).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"next: 'parent offline (ping timeout)'",
|
||||
"complete",
|
||||
]
|
||||
`);
|
||||
});
|
||||
});
|
||||
|
||||
test('it emits nothing if parent responds with pongs', async () => {
|
||||
const send = jest.fn((msg: any) => {
|
||||
if (isWorkerPing(msg)) {
|
||||
process.nextTick(() => {
|
||||
mockProc.emit('message', parentMsgs.pong(), undefined);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const mockProc = new MockProcess({ send });
|
||||
const unsub$ = new Rx.Subject();
|
||||
const promise = record(observeParentOffline(mockProc, workerMsgs).pipe(takeUntil(unsub$)));
|
||||
|
||||
jest.advanceTimersByTime(5000);
|
||||
await waitForTick();
|
||||
jest.advanceTimersByTime(5000);
|
||||
await waitForTick();
|
||||
unsub$.next();
|
||||
|
||||
expect(await promise).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"complete",
|
||||
]
|
||||
`);
|
||||
expect(send).toHaveBeenCalledTimes(2);
|
||||
});
|
|
@ -1,90 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
|
||||
import * as Rx from 'rxjs';
|
||||
import { mergeMap, take, first, map, catchError } from 'rxjs/operators';
|
||||
|
||||
import { isParentPong, WorkerMsgs } from '../common';
|
||||
|
||||
const sleep = (ms: number) => Rx.timer(ms).pipe(take(1));
|
||||
|
||||
export interface Process extends EventEmitter {
|
||||
connected?: boolean;
|
||||
send?: (msg: any) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an observable that will emit a value when the parent
|
||||
* process goes offline. It accomplishes this by merging several
|
||||
* signals:
|
||||
*
|
||||
* 1. process "disconnect" event
|
||||
* 2. process.connected or process.send are falsy
|
||||
* 3. a ping was sent to the parent process but it didn't respond
|
||||
* with a pong within 5 seconds
|
||||
* 4. a ping was sent to the parent process but the process.send
|
||||
* call errored with an 'ERR_IPC_CHANNEL_CLOSED' exception
|
||||
*/
|
||||
export function observeParentOffline(process: Process, workerMsgs: WorkerMsgs) {
|
||||
return sleep(5000).pipe(
|
||||
mergeMap(() => {
|
||||
if (!process.connected || !process.send) {
|
||||
return Rx.of('parent offline (disconnected)');
|
||||
}
|
||||
|
||||
process.send(workerMsgs.ping());
|
||||
|
||||
const pong$ = Rx.fromEvent<[any]>(process, 'message').pipe(
|
||||
first(([msg]) => isParentPong(msg)),
|
||||
map(() => {
|
||||
throw new Error('parent still online');
|
||||
})
|
||||
);
|
||||
|
||||
// give the parent some time to respond, if the ping
|
||||
// wins the race the parent is considered online
|
||||
const timeout$ = sleep(5000).pipe(map(() => 'parent offline (ping timeout)'));
|
||||
|
||||
return Rx.race(pong$, timeout$);
|
||||
}),
|
||||
|
||||
/**
|
||||
* resubscribe to the source observable (triggering the timer,
|
||||
* ping, wait for response) if the source observable does not
|
||||
* observe the parent being offline yet.
|
||||
*
|
||||
* Scheduling the interval this way prevents the ping timeout
|
||||
* from overlaping with the interval by only scheduling the
|
||||
* next ping once the previous ping has completed
|
||||
*/
|
||||
catchError((error, resubscribe) => {
|
||||
if (error.code === 'ERR_IPC_CHANNEL_CLOSED') {
|
||||
return Rx.of('parent offline (ipc channel exception)');
|
||||
}
|
||||
|
||||
if (error.message === 'parent still online') {
|
||||
return resubscribe;
|
||||
}
|
||||
|
||||
throw error;
|
||||
})
|
||||
);
|
||||
}
|
|
@ -18,12 +18,10 @@
|
|||
*/
|
||||
|
||||
import * as Rx from 'rxjs';
|
||||
import { takeUntil } from 'rxjs/operators';
|
||||
|
||||
import { parseBundles, parseWorkerConfig, WorkerMsg, isWorkerMsg, WorkerMsgs } from '../common';
|
||||
|
||||
import { runCompilers } from './run_compilers';
|
||||
import { observeParentOffline } from './observe_parent_offline';
|
||||
|
||||
/**
|
||||
**
|
||||
|
@ -66,6 +64,15 @@ const exit = (code: number) => {
|
|||
}, 5000).unref();
|
||||
};
|
||||
|
||||
// check for connected parent on an unref'd timer rather than listening
|
||||
// to "disconnect" since that listner prevents the process from exiting
|
||||
setInterval(() => {
|
||||
if (!process.connected) {
|
||||
// parent is gone
|
||||
process.exit(0);
|
||||
}
|
||||
}, 1000).unref();
|
||||
|
||||
Rx.defer(() => {
|
||||
const workerConfig = parseWorkerConfig(process.argv[2]);
|
||||
const bundles = parseBundles(process.argv[3]);
|
||||
|
@ -74,22 +81,20 @@ Rx.defer(() => {
|
|||
process.env.BROWSERSLIST_ENV = workerConfig.browserslistEnv;
|
||||
|
||||
return runCompilers(workerConfig, bundles);
|
||||
})
|
||||
.pipe(takeUntil(observeParentOffline(process, workerMsgs)))
|
||||
.subscribe(
|
||||
(msg) => {
|
||||
send(msg);
|
||||
},
|
||||
(error) => {
|
||||
if (isWorkerMsg(error)) {
|
||||
send(error);
|
||||
} else {
|
||||
send(workerMsgs.error(error));
|
||||
}
|
||||
|
||||
exit(1);
|
||||
},
|
||||
() => {
|
||||
exit(0);
|
||||
}).subscribe(
|
||||
(msg) => {
|
||||
send(msg);
|
||||
},
|
||||
(error) => {
|
||||
if (isWorkerMsg(error)) {
|
||||
send(error);
|
||||
} else {
|
||||
send(workerMsgs.error(error));
|
||||
}
|
||||
);
|
||||
|
||||
exit(1);
|
||||
},
|
||||
() => {
|
||||
exit(0);
|
||||
}
|
||||
);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue