[ML] Fixing job progress subscriber leak (#43767) (#43851)

This commit is contained in:
James Gowdy 2019-08-23 12:29:20 +01:00 committed by GitHub
parent 04f79dc0c1
commit f80793b0c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 7 deletions

View file

@ -28,13 +28,13 @@ export class JobCreator {
protected _useDedicatedIndex: boolean = false;
protected _start: number = 0;
protected _end: number = 0;
protected _subscribers: ProgressSubscriber[];
protected _subscribers: ProgressSubscriber[] = [];
protected _aggs: Aggregation[] = [];
protected _fields: Field[] = [];
protected _sparseData: boolean = false;
private _stopAllRefreshPolls: {
stop: boolean;
};
} = { stop: false };
constructor(indexPattern: IndexPattern, savedSearch: SavedSearch, query: object) {
this._indexPattern = indexPattern;
@ -51,8 +51,6 @@ export class JobCreator {
}
this._datafeed_config.query = query;
this._subscribers = [];
this._stopAllRefreshPolls = { stop: false };
}
public get type(): JOB_TYPE {

View file

@ -28,6 +28,7 @@ export class JobRunner {
private _stopRefreshPoll: {
stop: boolean;
};
private _subscribers: ProgressSubscriber[];
constructor(jobCreator: JobCreator) {
this._jobId = jobCreator.jobId;
@ -38,9 +39,7 @@ export class JobRunner {
this._stopRefreshPoll = jobCreator.stopAllRefreshPolls;
this._progress$ = new BehaviorSubject(this._percentageComplete);
// link the _subscribers list from the JobCreator
// to the progress BehaviorSubject.
jobCreator.subscribers.forEach(s => this._progress$.subscribe(s));
this._subscribers = jobCreator.subscribers;
}
public get datafeedState(): DATAFEED_STATE {
@ -72,6 +71,11 @@ export class JobRunner {
pollProgress: boolean
): Promise<boolean> {
try {
// link the _subscribers list from the JobCreator
// to the progress BehaviorSubject.
const subscriptions =
pollProgress === true ? this._subscribers.map(s => this._progress$.subscribe(s)) : [];
await this.openJob();
const { started } = await mlJobService.startDatafeed(
this._datafeedId,
@ -93,6 +97,9 @@ export class JobRunner {
setTimeout(async () => {
await check();
}, this._refreshInterval);
} else {
// job has finished running, unsubscribe everyone
subscriptions.forEach(s => s.unsubscribe());
}
};
// wait for the first check to run and then return success.