mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
* [Code] interrupt clone if disk usage goes above watermark during clone * minor refactor * add unit tests * minor change * fix type check * minor fix * fix test
This commit is contained in:
parent
c53e75d963
commit
d2a52c0888
14 changed files with 239 additions and 88 deletions
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
|
||||
import { IndexRequest } from './search';
|
||||
import { CancellationReason } from '../server/queue/cancellation_service';
|
||||
|
||||
export type RepositoryUri = string;
|
||||
|
||||
|
@ -86,12 +87,13 @@ export enum FileTreeItemType {
|
|||
export interface WorkerResult {
|
||||
uri: string;
|
||||
cancelled?: boolean;
|
||||
cancelledReason?: CancellationReason;
|
||||
}
|
||||
|
||||
// TODO(mengwei): create a AbstractGitWorkerResult since we now have an
|
||||
// AbstractGitWorker now.
|
||||
export interface CloneWorkerResult extends WorkerResult {
|
||||
repo: Repository;
|
||||
repo?: Repository;
|
||||
}
|
||||
|
||||
export interface DeleteWorkerResult extends WorkerResult {
|
||||
|
|
|
@ -12,13 +12,13 @@ import path from 'path';
|
|||
import rimraf from 'rimraf';
|
||||
import sinon from 'sinon';
|
||||
|
||||
import { Repository } from '../../model';
|
||||
import { CloneWorkerResult, Repository } from '../../model';
|
||||
import { DiskWatermarkService } from '../disk_watermark';
|
||||
import { GitOperations } from '../git_operations';
|
||||
import { EsClient, Esqueue } from '../lib/esqueue';
|
||||
import { Logger } from '../log';
|
||||
import { CloneWorker, IndexWorker } from '../queue';
|
||||
import { CancellationSerivce } from '../queue/cancellation_service';
|
||||
import { CancellationReason, CancellationSerivce } from '../queue/cancellation_service';
|
||||
import { RepositoryServiceFactory } from '../repository_service_factory';
|
||||
import { createTestServerOption, emptyAsyncFunc } from '../test_utils';
|
||||
import { ConsoleLoggerFactory } from '../utils/console_logger_factory';
|
||||
|
@ -372,34 +372,34 @@ describe('clone_worker_tests', () => {
|
|||
diskWatermarkService as DiskWatermarkService
|
||||
);
|
||||
|
||||
const result1 = await cloneWorker.executeJob({
|
||||
const result1 = (await cloneWorker.executeJob({
|
||||
payload: {
|
||||
url: 'file:///foo/bar.git',
|
||||
},
|
||||
options: {},
|
||||
timestamp: 0,
|
||||
});
|
||||
})) as CloneWorkerResult;
|
||||
|
||||
assert.ok(result1.repo === null);
|
||||
assert.ok(!result1.repo);
|
||||
assert.ok(newInstanceSpy.notCalled);
|
||||
assert.ok(cloneSpy.notCalled);
|
||||
assert.ok(isLowWatermarkSpy.calledOnce);
|
||||
|
||||
const result2 = await cloneWorker.executeJob({
|
||||
const result2 = (await cloneWorker.executeJob({
|
||||
payload: {
|
||||
url: '/foo/bar.git',
|
||||
},
|
||||
options: {},
|
||||
timestamp: 0,
|
||||
});
|
||||
})) as CloneWorkerResult;
|
||||
|
||||
assert.ok(result2.repo === null);
|
||||
assert.ok(!result2.repo);
|
||||
assert.ok(newInstanceSpy.notCalled);
|
||||
assert.ok(cloneSpy.notCalled);
|
||||
assert.ok(isLowWatermarkSpy.calledTwice);
|
||||
});
|
||||
|
||||
it('Execute clone job failed because of low disk watermark', async () => {
|
||||
it('Execute clone job failed because of low available disk space', async () => {
|
||||
// Setup RepositoryService
|
||||
const cloneSpy = sinon.spy();
|
||||
const repoService = {
|
||||
|
@ -428,28 +428,44 @@ describe('clone_worker_tests', () => {
|
|||
const isLowWatermarkSpy = sinon.stub().resolves(true);
|
||||
const diskWatermarkService: any = {
|
||||
isLowWatermark: isLowWatermarkSpy,
|
||||
diskWatermarkViolationMessage: sinon.stub().returns('No enough disk space'),
|
||||
};
|
||||
|
||||
// Setup EsClient
|
||||
const updateSpy = sinon.spy();
|
||||
const esClient = {
|
||||
update: emptyAsyncFunc,
|
||||
};
|
||||
esClient.update = updateSpy;
|
||||
|
||||
// Setup IndexWorker
|
||||
const enqueueJobSpy = sinon.spy();
|
||||
const indexWorker = {
|
||||
enqueueJob: emptyAsyncFunc,
|
||||
};
|
||||
indexWorker.enqueueJob = enqueueJobSpy;
|
||||
|
||||
const cloneWorker = new CloneWorker(
|
||||
esQueue as Esqueue,
|
||||
log,
|
||||
{} as EsClient,
|
||||
esClient as EsClient,
|
||||
serverOptions,
|
||||
gitOps,
|
||||
{} as IndexWorker,
|
||||
(indexWorker as any) as IndexWorker,
|
||||
(repoServiceFactory as any) as RepositoryServiceFactory,
|
||||
cancellationService as CancellationSerivce,
|
||||
diskWatermarkService as DiskWatermarkService
|
||||
);
|
||||
|
||||
let res: CloneWorkerResult = { uri: 'github.com/Microsoft/TypeScript-Node-Starter' };
|
||||
try {
|
||||
await cloneWorker.executeJob({
|
||||
res = (await cloneWorker.executeJob({
|
||||
payload: {
|
||||
url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git',
|
||||
},
|
||||
options: {},
|
||||
timestamp: 0,
|
||||
});
|
||||
})) as CloneWorkerResult;
|
||||
// This step should not be touched.
|
||||
assert.ok(false);
|
||||
} catch (error) {
|
||||
|
@ -457,5 +473,29 @@ describe('clone_worker_tests', () => {
|
|||
assert.ok(newInstanceSpy.notCalled);
|
||||
assert.ok(cloneSpy.notCalled);
|
||||
}
|
||||
|
||||
assert.ok(res.cancelled);
|
||||
assert.ok(res.cancelledReason === CancellationReason.LOW_DISK_SPACE);
|
||||
|
||||
const onJobExecutionErrorSpy = sinon.spy();
|
||||
cloneWorker.onJobExecutionError = onJobExecutionErrorSpy;
|
||||
|
||||
await cloneWorker.onJobCompleted(
|
||||
{
|
||||
payload: {
|
||||
url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git',
|
||||
},
|
||||
options: {},
|
||||
timestamp: 0,
|
||||
},
|
||||
res
|
||||
);
|
||||
|
||||
assert.ok(onJobExecutionErrorSpy.calledOnce);
|
||||
// Non of the follow up steps of a normal complete job should not be called
|
||||
// because the job is going to be forwarded as execution error.
|
||||
assert.ok(updateSpy.notCalled);
|
||||
await delay(1000);
|
||||
assert.ok(enqueueJobSpy.notCalled);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -5,6 +5,6 @@
|
|||
*/
|
||||
|
||||
export class CancellationToken {
|
||||
public on(callback: () => void): void;
|
||||
public cancel(): void;
|
||||
public on(callback: (reason: string) => void): void;
|
||||
public cancel(reason: string): void;
|
||||
}
|
||||
|
|
|
@ -16,15 +16,14 @@ export class CancellationToken {
|
|||
}
|
||||
|
||||
if (this.isCancelled) {
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
|
||||
this._callbacks.push(callback);
|
||||
};
|
||||
|
||||
cancel = () => {
|
||||
cancel = (reason) => {
|
||||
this.isCancelled = true;
|
||||
this._callbacks.forEach(callback => callback());
|
||||
this._callbacks.forEach(callback => callback(reason));
|
||||
};
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import { Logger } from '../log';
|
|||
import { RepositoryObjectClient } from '../search';
|
||||
import { ServerOptions } from '../server_options';
|
||||
import { AbstractWorker } from './abstract_worker';
|
||||
import { CancellationReason } from './cancellation_service';
|
||||
import { Job } from './job';
|
||||
|
||||
export abstract class AbstractGitWorker extends AbstractWorker {
|
||||
|
@ -36,23 +37,21 @@ export abstract class AbstractGitWorker extends AbstractWorker {
|
|||
this.objectClient = new RepositoryObjectClient(client);
|
||||
}
|
||||
|
||||
public async executeJob(_: Job): Promise<WorkerResult> {
|
||||
public async executeJob(job: Job): Promise<WorkerResult> {
|
||||
const uri = job.payload.uri;
|
||||
if (await this.watermarkService.isLowWatermark()) {
|
||||
const msg = this.watermarkService.diskWatermarkViolationMessage();
|
||||
this.log.error(msg);
|
||||
throw new Error(msg);
|
||||
// Return job result as cancelled.
|
||||
return {
|
||||
uri,
|
||||
cancelled: true,
|
||||
cancelledReason: CancellationReason.LOW_DISK_SPACE,
|
||||
};
|
||||
}
|
||||
|
||||
return new Promise<WorkerResult>((resolve, reject) => {
|
||||
resolve();
|
||||
});
|
||||
return { uri };
|
||||
}
|
||||
|
||||
public async onJobCompleted(job: Job, res: CloneWorkerResult) {
|
||||
if (res.cancelled) {
|
||||
// Skip updating job progress if the job is done because of cancellation.
|
||||
return;
|
||||
}
|
||||
await super.onJobCompleted(job, res);
|
||||
|
||||
// Update the default branch.
|
||||
|
@ -108,4 +107,17 @@ export abstract class AbstractGitWorker extends AbstractWorker {
|
|||
// this.log.warn(err);
|
||||
}
|
||||
}
|
||||
|
||||
protected async onJobCancelled(job: Job, reason?: CancellationReason) {
|
||||
if (reason && reason === CancellationReason.LOW_DISK_SPACE) {
|
||||
// If the clone/update job is cancelled because of the disk watermark, manually
|
||||
// trigger onJobExecutionError.
|
||||
const msg = this.watermarkService.diskWatermarkViolationMessage();
|
||||
this.log.error(
|
||||
'Git clone/update job completed because of low disk space. Move forward as error.'
|
||||
);
|
||||
const error = new Error(msg);
|
||||
await this.onJobExecutionError({ job, error });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ import { CancellationToken } from '../lib/esqueue';
|
|||
|
||||
import sinon from 'sinon';
|
||||
|
||||
import { CancellationSerivce } from './cancellation_service';
|
||||
import { CancellationReason, CancellationSerivce } from './cancellation_service';
|
||||
|
||||
afterEach(() => {
|
||||
sinon.restore();
|
||||
|
@ -30,9 +30,9 @@ test('Register and cancel cancellation token', async () => {
|
|||
const promise = new Promise(resolve => {
|
||||
promiseResolve = resolve;
|
||||
});
|
||||
await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise);
|
||||
await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise);
|
||||
// do not wait on the promise, or there will be a dead lock
|
||||
const cancelPromise = service.cancelIndexJob(repoUri);
|
||||
const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN);
|
||||
// resolve the promise now
|
||||
promiseResolve();
|
||||
|
||||
|
@ -57,10 +57,10 @@ test('Register and cancel cancellation token while an exception is thrown from t
|
|||
const promise = new Promise((resolve, reject) => {
|
||||
promiseReject = reject;
|
||||
});
|
||||
await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise);
|
||||
await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise);
|
||||
// expect no exceptions are thrown when cancelling the job
|
||||
// do not wait on the promise, or there will be a dead lock
|
||||
const cancelPromise = service.cancelIndexJob(repoUri);
|
||||
const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN);
|
||||
// reject the promise now
|
||||
promiseReject();
|
||||
|
||||
|
|
|
@ -12,6 +12,12 @@ interface CancellableJob {
|
|||
jobPromise: Promise<any>;
|
||||
}
|
||||
|
||||
export enum CancellationReason {
|
||||
REPOSITORY_DELETE = 'Cancel job because of deleting the entire repository',
|
||||
LOW_DISK_SPACE = 'Cancel job because of low available disk space',
|
||||
NEW_JOB_OVERRIDEN = 'Cancel job because of a new job of the same type has been registered',
|
||||
}
|
||||
|
||||
export class CancellationSerivce {
|
||||
private cloneCancellationMap: Map<RepositoryUri, CancellableJob>;
|
||||
private updateCancellationMap: Map<RepositoryUri, CancellableJob>;
|
||||
|
@ -23,16 +29,16 @@ export class CancellationSerivce {
|
|||
this.indexCancellationMap = new Map<RepositoryUri, CancellableJob>();
|
||||
}
|
||||
|
||||
public async cancelCloneJob(repoUri: RepositoryUri) {
|
||||
await this.cancelJob(this.cloneCancellationMap, repoUri);
|
||||
public async cancelCloneJob(repoUri: RepositoryUri, reason: CancellationReason) {
|
||||
await this.cancelJob(this.cloneCancellationMap, repoUri, reason);
|
||||
}
|
||||
|
||||
public async cancelUpdateJob(repoUri: RepositoryUri) {
|
||||
await this.cancelJob(this.updateCancellationMap, repoUri);
|
||||
public async cancelUpdateJob(repoUri: RepositoryUri, reason: CancellationReason) {
|
||||
await this.cancelJob(this.updateCancellationMap, repoUri, reason);
|
||||
}
|
||||
|
||||
public async cancelIndexJob(repoUri: RepositoryUri) {
|
||||
await this.cancelJob(this.indexCancellationMap, repoUri);
|
||||
public async cancelIndexJob(repoUri: RepositoryUri, reason: CancellationReason) {
|
||||
await this.cancelJob(this.indexCancellationMap, repoUri, reason);
|
||||
}
|
||||
|
||||
public async registerCancelableCloneJob(
|
||||
|
@ -66,7 +72,7 @@ export class CancellationSerivce {
|
|||
jobPromise: Promise<any>
|
||||
) {
|
||||
// Try to cancel the job first.
|
||||
await this.cancelJob(jobMap, repoUri);
|
||||
await this.cancelJob(jobMap, repoUri, CancellationReason.NEW_JOB_OVERRIDEN);
|
||||
jobMap.set(repoUri, { token, jobPromise });
|
||||
// remove the record from the cancellation service when the promise is fulfilled or rejected.
|
||||
jobPromise.finally(() => {
|
||||
|
@ -74,12 +80,16 @@ export class CancellationSerivce {
|
|||
});
|
||||
}
|
||||
|
||||
private async cancelJob(jobMap: Map<RepositoryUri, CancellableJob>, repoUri: RepositoryUri) {
|
||||
private async cancelJob(
|
||||
jobMap: Map<RepositoryUri, CancellableJob>,
|
||||
repoUri: RepositoryUri,
|
||||
reason: CancellationReason
|
||||
) {
|
||||
const payload = jobMap.get(repoUri);
|
||||
if (payload) {
|
||||
const { token, jobPromise } = payload;
|
||||
// 1. Use the cancellation token to pass cancel message to job
|
||||
token.cancel();
|
||||
token.cancel(reason);
|
||||
// 2. waiting on the actual job promise to be resolved
|
||||
try {
|
||||
await jobPromise;
|
||||
|
|
|
@ -21,7 +21,7 @@ import { Logger } from '../log';
|
|||
import { RepositoryServiceFactory } from '../repository_service_factory';
|
||||
import { ServerOptions } from '../server_options';
|
||||
import { AbstractGitWorker } from './abstract_git_worker';
|
||||
import { CancellationSerivce } from './cancellation_service';
|
||||
import { CancellationReason, CancellationSerivce } from './cancellation_service';
|
||||
import { IndexWorker } from './index_worker';
|
||||
import { Job } from './job';
|
||||
|
||||
|
@ -43,7 +43,10 @@ export class CloneWorker extends AbstractGitWorker {
|
|||
}
|
||||
|
||||
public async executeJob(job: Job) {
|
||||
await super.executeJob(job);
|
||||
const superRes = await super.executeJob(job);
|
||||
if (superRes.cancelled) {
|
||||
return superRes;
|
||||
}
|
||||
|
||||
const { payload, cancellationToken } = job;
|
||||
const { url } = payload;
|
||||
|
@ -58,9 +61,7 @@ export class CloneWorker extends AbstractGitWorker {
|
|||
this.log.error(error);
|
||||
return {
|
||||
uri: url,
|
||||
// Return a null repo for invalid git url.
|
||||
repo: null,
|
||||
};
|
||||
} as CloneWorkerResult;
|
||||
}
|
||||
|
||||
this.log.info(`Execute clone job for ${url}`);
|
||||
|
@ -73,22 +74,36 @@ export class CloneWorker extends AbstractGitWorker {
|
|||
const repo = RepositoryUtils.buildRepository(url);
|
||||
|
||||
// Try to cancel any existing clone job for this repository.
|
||||
this.cancellationService.cancelCloneJob(repo.uri);
|
||||
this.cancellationService.cancelCloneJob(repo.uri, CancellationReason.NEW_JOB_OVERRIDEN);
|
||||
|
||||
let cancelled = false;
|
||||
let cancelledReason;
|
||||
if (cancellationToken) {
|
||||
cancellationToken.on(() => {
|
||||
cancellationToken.on((reason: string) => {
|
||||
cancelled = true;
|
||||
cancelledReason = reason;
|
||||
});
|
||||
}
|
||||
|
||||
const cloneJobPromise = repoService.clone(
|
||||
repo,
|
||||
(progress: number, cloneProgress?: CloneProgress) => {
|
||||
async (progress: number, cloneProgress?: CloneProgress) => {
|
||||
if (cancelled) {
|
||||
// return false to stop the clone progress
|
||||
return false;
|
||||
}
|
||||
|
||||
// Keep an eye on the disk usage during clone in case it goes above the
|
||||
// disk watermark config.
|
||||
if (await this.watermarkService.isLowWatermark()) {
|
||||
// Cancel this clone job
|
||||
if (cancellationToken) {
|
||||
cancellationToken.cancel(CancellationReason.LOW_DISK_SPACE);
|
||||
}
|
||||
// return false to stop the clone progress
|
||||
return false;
|
||||
}
|
||||
|
||||
// For clone job payload, it only has the url. Populate back the
|
||||
// repository uri before update progress.
|
||||
job.payload.uri = repo.uri;
|
||||
|
@ -104,26 +119,31 @@ export class CloneWorker extends AbstractGitWorker {
|
|||
cloneJobPromise
|
||||
);
|
||||
}
|
||||
return await cloneJobPromise;
|
||||
const res = await cloneJobPromise;
|
||||
return {
|
||||
...res,
|
||||
cancelled,
|
||||
cancelledReason,
|
||||
};
|
||||
}
|
||||
|
||||
public async onJobCompleted(job: Job, res: CloneWorkerResult) {
|
||||
if (res.cancelled) {
|
||||
await this.onJobCancelled(job, res.cancelledReason);
|
||||
// Skip updating job progress if the job is done because of cancellation.
|
||||
return;
|
||||
}
|
||||
this.log.info(`Clone job done for ${res.repo.uri}`);
|
||||
|
||||
const { uri, revision } = res.repo!;
|
||||
this.log.info(`Clone job done for ${uri}`);
|
||||
// For clone job payload, it only has the url. Populate back the
|
||||
// repository uri.
|
||||
job.payload.uri = res.repo.uri;
|
||||
job.payload.uri = uri;
|
||||
await super.onJobCompleted(job, res);
|
||||
|
||||
// Throw out a repository index request after 1 second.
|
||||
return delay(async () => {
|
||||
const payload = {
|
||||
uri: res.repo.uri,
|
||||
revision: res.repo.revision,
|
||||
};
|
||||
const payload = { uri, revision };
|
||||
await this.indexWorker.enqueueJob(payload, {});
|
||||
}, 1000);
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ import { RepositoryServiceFactory } from '../repository_service_factory';
|
|||
import { RepositoryObjectClient } from '../search';
|
||||
import { ServerOptions } from '../server_options';
|
||||
import { AbstractWorker } from './abstract_worker';
|
||||
import { CancellationSerivce } from './cancellation_service';
|
||||
import { CancellationReason, CancellationSerivce } from './cancellation_service';
|
||||
import { Job } from './job';
|
||||
|
||||
export class DeleteWorker extends AbstractWorker {
|
||||
|
@ -42,9 +42,9 @@ export class DeleteWorker extends AbstractWorker {
|
|||
const { uri } = job.payload;
|
||||
|
||||
// 1. Cancel running workers
|
||||
await this.cancellationService.cancelCloneJob(uri);
|
||||
await this.cancellationService.cancelUpdateJob(uri);
|
||||
await this.cancellationService.cancelIndexJob(uri);
|
||||
await this.cancellationService.cancelCloneJob(uri, CancellationReason.REPOSITORY_DELETE);
|
||||
await this.cancellationService.cancelUpdateJob(uri, CancellationReason.REPOSITORY_DELETE);
|
||||
await this.cancellationService.cancelIndexJob(uri, CancellationReason.REPOSITORY_DELETE);
|
||||
|
||||
// 2. Delete git repository and all related data.
|
||||
const repoService = this.repoServiceFactory.newInstance(
|
||||
|
|
|
@ -17,7 +17,7 @@ import { CancellationToken, EsClient, Esqueue } from '../lib/esqueue';
|
|||
import { Logger } from '../log';
|
||||
import { emptyAsyncFunc } from '../test_utils';
|
||||
import { ConsoleLoggerFactory } from '../utils/console_logger_factory';
|
||||
import { CancellationSerivce } from './cancellation_service';
|
||||
import { CancellationReason, CancellationSerivce } from './cancellation_service';
|
||||
import { IndexWorker } from './index_worker';
|
||||
|
||||
const log: Logger = new ConsoleLoggerFactory().getLogger(['test']);
|
||||
|
@ -164,7 +164,7 @@ test('Execute index job and then cancel.', async () => {
|
|||
});
|
||||
|
||||
// Cancel the index job.
|
||||
cToken.cancel();
|
||||
cToken.cancel(CancellationReason.REPOSITORY_DELETE);
|
||||
|
||||
expect(cancelIndexJobSpy.calledOnce).toBeTruthy();
|
||||
expect(getSpy.calledOnce).toBeTruthy();
|
||||
|
|
|
@ -23,7 +23,7 @@ import { Logger } from '../log';
|
|||
import { RepositoryObjectClient } from '../search';
|
||||
import { aggregateIndexStats } from '../utils/index_stats_aggregator';
|
||||
import { AbstractWorker } from './abstract_worker';
|
||||
import { CancellationSerivce } from './cancellation_service';
|
||||
import { CancellationReason, CancellationSerivce } from './cancellation_service';
|
||||
import { Job } from './job';
|
||||
|
||||
export class IndexWorker extends AbstractWorker {
|
||||
|
@ -84,7 +84,7 @@ export class IndexWorker extends AbstractWorker {
|
|||
|
||||
// Binding the index cancellation logic
|
||||
let cancelled = false;
|
||||
this.cancellationService.cancelIndexJob(uri);
|
||||
this.cancellationService.cancelIndexJob(uri, CancellationReason.NEW_JOB_OVERRIDEN);
|
||||
const indexPromises: Array<Promise<IndexStats>> = this.indexerFactories.map(
|
||||
async (indexerFactory: IndexerFactory, index: number) => {
|
||||
const indexer = await indexerFactory.create(uri, revision, enforceReindex);
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
import sinon from 'sinon';
|
||||
|
||||
import { EsClient, Esqueue } from '../lib/esqueue';
|
||||
import { Repository } from '../../model';
|
||||
import { Repository, UpdateWorkerResult } from '../../model';
|
||||
import { DiskWatermarkService } from '../disk_watermark';
|
||||
import { GitOperations } from '../git_operations';
|
||||
import { Logger } from '../log';
|
||||
|
@ -15,7 +15,7 @@ import { RepositoryServiceFactory } from '../repository_service_factory';
|
|||
import { ServerOptions } from '../server_options';
|
||||
import { emptyAsyncFunc } from '../test_utils';
|
||||
import { ConsoleLoggerFactory } from '../utils/console_logger_factory';
|
||||
import { CancellationSerivce } from './cancellation_service';
|
||||
import { CancellationReason, CancellationSerivce } from './cancellation_service';
|
||||
import { UpdateWorker } from './update_worker';
|
||||
|
||||
const log: Logger = new ConsoleLoggerFactory().getLogger(['test']);
|
||||
|
@ -141,6 +141,7 @@ test('On update job completed because of cancellation ', async () => {
|
|||
} as any) as Repository,
|
||||
// Update job is done because of cancellation.
|
||||
cancelled: true,
|
||||
cancelledReason: CancellationReason.REPOSITORY_DELETE,
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -149,7 +150,7 @@ test('On update job completed because of cancellation ', async () => {
|
|||
expect(updateSpy.notCalled).toBeTruthy();
|
||||
});
|
||||
|
||||
test('Execute update job failed because of low disk watermark ', async () => {
|
||||
test('Execute update job failed because of low available disk space', async () => {
|
||||
// Setup RepositoryService
|
||||
const updateSpy = sinon.spy();
|
||||
const repoService = {
|
||||
|
@ -178,6 +179,7 @@ test('Execute update job failed because of low disk watermark ', async () => {
|
|||
const isLowWatermarkSpy = sinon.stub().resolves(true);
|
||||
const diskWatermarkService: any = {
|
||||
isLowWatermark: isLowWatermarkSpy,
|
||||
diskWatermarkViolationMessage: sinon.stub().returns('No enough disk space'),
|
||||
};
|
||||
|
||||
const updateWorker = new UpdateWorker(
|
||||
|
@ -199,14 +201,19 @@ test('Execute update job failed because of low disk watermark ', async () => {
|
|||
diskWatermarkService as DiskWatermarkService
|
||||
);
|
||||
|
||||
let res: UpdateWorkerResult = {
|
||||
uri: 'mockrepo',
|
||||
branch: 'mockbranch',
|
||||
revision: 'mockrevision',
|
||||
};
|
||||
try {
|
||||
await updateWorker.executeJob({
|
||||
res = (await updateWorker.executeJob({
|
||||
payload: {
|
||||
uri: 'mockrepo',
|
||||
},
|
||||
options: {},
|
||||
timestamp: 0,
|
||||
});
|
||||
})) as UpdateWorkerResult;
|
||||
// This step should not be touched.
|
||||
expect(false).toBeTruthy();
|
||||
} catch (error) {
|
||||
|
@ -215,9 +222,31 @@ test('Execute update job failed because of low disk watermark ', async () => {
|
|||
expect(newInstanceSpy.notCalled).toBeTruthy();
|
||||
expect(updateSpy.notCalled).toBeTruthy();
|
||||
}
|
||||
|
||||
expect(res.cancelled).toBeTruthy();
|
||||
expect(res.cancelledReason).toEqual(CancellationReason.LOW_DISK_SPACE);
|
||||
|
||||
const onJobExecutionErrorSpy = sinon.spy();
|
||||
updateWorker.onJobExecutionError = onJobExecutionErrorSpy;
|
||||
|
||||
await updateWorker.onJobCompleted(
|
||||
{
|
||||
payload: {
|
||||
uri: 'mockrepo',
|
||||
},
|
||||
options: {},
|
||||
timestamp: 0,
|
||||
},
|
||||
res
|
||||
);
|
||||
|
||||
expect(onJobExecutionErrorSpy.calledOnce).toBeTruthy();
|
||||
// Non of the follow up steps of a normal complete job should not be called
|
||||
// because the job is going to be forwarded as execution error.
|
||||
expect(updateSpy.notCalled).toBeTruthy();
|
||||
});
|
||||
|
||||
test('On update job error or timeout will not persis error', async () => {
|
||||
test('On update job error or timeout will not persist as error', async () => {
|
||||
// Setup EsClient
|
||||
const esUpdateSpy = sinon.spy();
|
||||
esClient.update = esUpdateSpy;
|
||||
|
|
|
@ -12,7 +12,7 @@ import { Logger } from '../log';
|
|||
import { RepositoryServiceFactory } from '../repository_service_factory';
|
||||
import { ServerOptions } from '../server_options';
|
||||
import { AbstractGitWorker } from './abstract_git_worker';
|
||||
import { CancellationSerivce } from './cancellation_service';
|
||||
import { CancellationReason, CancellationSerivce } from './cancellation_service';
|
||||
import { Job } from './job';
|
||||
|
||||
export class UpdateWorker extends AbstractGitWorker {
|
||||
|
@ -32,7 +32,10 @@ export class UpdateWorker extends AbstractGitWorker {
|
|||
}
|
||||
|
||||
public async executeJob(job: Job) {
|
||||
await super.executeJob(job);
|
||||
const superRes = await super.executeJob(job);
|
||||
if (superRes.cancelled) {
|
||||
return superRes;
|
||||
}
|
||||
|
||||
const { payload, cancellationToken } = job;
|
||||
const repo: Repository = payload;
|
||||
|
@ -45,20 +48,34 @@ export class UpdateWorker extends AbstractGitWorker {
|
|||
);
|
||||
|
||||
// Try to cancel any existing update job for this repository.
|
||||
this.cancellationService.cancelUpdateJob(repo.uri);
|
||||
this.cancellationService.cancelUpdateJob(repo.uri, CancellationReason.NEW_JOB_OVERRIDEN);
|
||||
|
||||
let cancelled = false;
|
||||
let cancelledReason;
|
||||
if (cancellationToken) {
|
||||
cancellationToken.on(() => {
|
||||
cancellationToken.on((reason: string) => {
|
||||
cancelled = true;
|
||||
cancelledReason = reason;
|
||||
});
|
||||
}
|
||||
|
||||
const updateJobPromise = repoService.update(repo, () => {
|
||||
const updateJobPromise = repoService.update(repo, async () => {
|
||||
if (cancelled) {
|
||||
// return false to stop the clone progress
|
||||
// return false to stop the update progress
|
||||
return false;
|
||||
}
|
||||
|
||||
// Keep an eye on the disk usage during update in case it goes above the
|
||||
// disk watermark config.
|
||||
if (await this.watermarkService.isLowWatermark()) {
|
||||
// Cancel this update job
|
||||
if (cancellationToken) {
|
||||
cancellationToken.cancel(CancellationReason.LOW_DISK_SPACE);
|
||||
}
|
||||
// return false to stop the update progress
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
|
@ -69,11 +86,21 @@ export class UpdateWorker extends AbstractGitWorker {
|
|||
updateJobPromise
|
||||
);
|
||||
}
|
||||
|
||||
return await updateJobPromise;
|
||||
const res = await updateJobPromise;
|
||||
return {
|
||||
...res,
|
||||
cancelled,
|
||||
cancelledReason,
|
||||
};
|
||||
}
|
||||
|
||||
public async onJobCompleted(job: Job, res: CloneWorkerResult) {
|
||||
if (res.cancelled) {
|
||||
await this.onJobCancelled(job, res.cancelledReason);
|
||||
// Skip updating job progress if the job is done because of cancellation.
|
||||
return;
|
||||
}
|
||||
|
||||
this.log.info(`Update job done for ${job.payload.uri}`);
|
||||
return await super.onJobCompleted(job, res);
|
||||
}
|
||||
|
|
|
@ -22,8 +22,11 @@ import {
|
|||
import { Logger } from './log';
|
||||
|
||||
// Return false to stop the clone progress. Return true to keep going;
|
||||
export type CloneProgressHandler = (progress: number, cloneProgress?: CloneProgress) => boolean;
|
||||
export type UpdateProgressHandler = () => boolean;
|
||||
export type CloneProgressHandler = (
|
||||
progress: number,
|
||||
cloneProgress?: CloneProgress
|
||||
) => Promise<boolean>;
|
||||
export type UpdateProgressHandler = () => Promise<boolean>;
|
||||
|
||||
const GIT_FETCH_PROGRESS_CANCEL = -1;
|
||||
// TODO: Cannot directly access Git.Error.CODE.EUSER (-7). Investigate why.
|
||||
|
@ -127,10 +130,18 @@ export class RepositoryService {
|
|||
let repo: Git.Repository | undefined;
|
||||
try {
|
||||
repo = await Git.Repository.open(localPath);
|
||||
let lastProgressUpdate = moment();
|
||||
const cbs: RemoteCallbacks = {
|
||||
transferProgress: (_: any) => {
|
||||
transferProgress: async (_: any) => {
|
||||
// Update progress update throttling.
|
||||
const now = moment();
|
||||
if (now.diff(lastProgressUpdate) < this.PROGRESS_UPDATE_THROTTLING_FREQ_MS) {
|
||||
return 0;
|
||||
}
|
||||
lastProgressUpdate = now;
|
||||
|
||||
if (handler) {
|
||||
const resumeUpdate = handler();
|
||||
const resumeUpdate = await handler();
|
||||
if (!resumeUpdate) {
|
||||
return GIT_FETCH_PROGRESS_CANCEL;
|
||||
}
|
||||
|
@ -219,6 +230,7 @@ export class RepositoryService {
|
|||
throw SSH_AUTH_ERROR;
|
||||
}
|
||||
|
||||
private PROGRESS_UPDATE_THROTTLING_FREQ_MS = 1000;
|
||||
private async doClone(
|
||||
repo: Repository,
|
||||
localPath: string,
|
||||
|
@ -228,10 +240,10 @@ export class RepositoryService {
|
|||
try {
|
||||
let lastProgressUpdate = moment();
|
||||
const cbs: RemoteCallbacks = {
|
||||
transferProgress: (stats: any) => {
|
||||
transferProgress: async (stats: any) => {
|
||||
// Clone progress update throttling.
|
||||
const now = moment();
|
||||
if (now.diff(lastProgressUpdate) < 1000) {
|
||||
if (now.diff(lastProgressUpdate) < this.PROGRESS_UPDATE_THROTTLING_FREQ_MS) {
|
||||
return 0;
|
||||
}
|
||||
lastProgressUpdate = now;
|
||||
|
@ -250,7 +262,7 @@ export class RepositoryService {
|
|||
indexedDeltas: stats.indexedDeltas(),
|
||||
receivedBytes: stats.receivedBytes(),
|
||||
};
|
||||
const resumeClone = handler(progress, cloneProgress);
|
||||
const resumeClone = await handler(progress, cloneProgress);
|
||||
if (!resumeClone) {
|
||||
return GIT_FETCH_PROGRESS_CANCEL;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue