[Code] Nodegit clone/update job cancellation (#37755) (#37795)

* [Code] Nodegit clone/update job cancellation

* Update tests
This commit is contained in:
Mengwei Ding 2019-06-01 14:34:50 -07:00 committed by GitHub
parent 78aa857202
commit fa94efb9f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 271 additions and 50 deletions

View file

@ -72,6 +72,7 @@ export enum FileTreeItemType {
export interface WorkerResult {
uri: string;
cancelled?: boolean;
}
// TODO(mengwei): create a AbstractGitWorkerResult since we now have an

View file

@ -15,8 +15,8 @@ import sinon from 'sinon';
import { Repository } from '../../model';
import { EsClient, Esqueue } from '../lib/esqueue';
import { Logger } from '../log';
import { CloneWorker } from '../queue';
import { IndexWorker } from '../queue';
import { CloneWorker, IndexWorker } from '../queue';
import { CancellationSerivce } from '../queue/cancellation_service';
import { RepositoryServiceFactory } from '../repository_service_factory';
import { createTestServerOption, emptyAsyncFunc } from '../test_utils';
import { ConsoleLoggerFactory } from '../utils/console_logger_factory';
@ -93,13 +93,24 @@ describe('clone_worker_tests', () => {
const newInstanceSpy = sinon.fake.returns(repoService);
repoServiceFactory.newInstance = newInstanceSpy;
// Setup CancellationService
const cancelCloneJobSpy = sinon.spy();
const registerCloneJobTokenSpy = sinon.spy();
const cancellationService: any = {
cancelCloneJob: emptyAsyncFunc,
registerCloneJobToken: emptyAsyncFunc,
};
cancellationService.cancelCloneJob = cancelCloneJobSpy;
cancellationService.registerCloneJobToken = registerCloneJobTokenSpy;
const cloneWorker = new CloneWorker(
esQueue as Esqueue,
log,
{} as EsClient,
serverOptions,
{} as IndexWorker,
(repoServiceFactory as any) as RepositoryServiceFactory
(repoServiceFactory as any) as RepositoryServiceFactory,
cancellationService as CancellationSerivce
);
await cloneWorker.executeJob({
@ -129,13 +140,24 @@ describe('clone_worker_tests', () => {
};
esClient.update = updateSpy;
// Setup CancellationService
const cancelCloneJobSpy = sinon.spy();
const registerCloneJobTokenSpy = sinon.spy();
const cancellationService: any = {
cancelCloneJob: emptyAsyncFunc,
registerCloneJobToken: emptyAsyncFunc,
};
cancellationService.cancelCloneJob = cancelCloneJobSpy;
cancellationService.registerCloneJobToken = registerCloneJobTokenSpy;
const cloneWorker = new CloneWorker(
esQueue as Esqueue,
log,
esClient as EsClient,
serverOptions,
(indexWorker as any) as IndexWorker,
{} as RepositoryServiceFactory
{} as RepositoryServiceFactory,
cancellationService as CancellationSerivce
);
await cloneWorker.onJobCompleted(
@ -173,13 +195,24 @@ describe('clone_worker_tests', () => {
};
esClient.index = indexSpy;
// Setup CancellationService
const cancelCloneJobSpy = sinon.spy();
const registerCloneJobTokenSpy = sinon.spy();
const cancellationService: any = {
cancelCloneJob: emptyAsyncFunc,
registerCloneJobToken: emptyAsyncFunc,
};
cancellationService.cancelCloneJob = cancelCloneJobSpy;
cancellationService.registerCloneJobToken = registerCloneJobTokenSpy;
const cloneWorker = new CloneWorker(
esQueue as Esqueue,
log,
(esClient as any) as EsClient,
serverOptions,
{} as IndexWorker,
{} as RepositoryServiceFactory
{} as RepositoryServiceFactory,
cancellationService as CancellationSerivce
);
await cloneWorker.onJobEnqueued({
@ -209,13 +242,24 @@ describe('clone_worker_tests', () => {
const newInstanceSpy = sinon.fake.returns(repoService);
repoServiceFactory.newInstance = newInstanceSpy;
// Setup CancellationService
const cancelCloneJobSpy = sinon.spy();
const registerCloneJobTokenSpy = sinon.spy();
const cancellationService: any = {
cancelCloneJob: emptyAsyncFunc,
registerCloneJobToken: emptyAsyncFunc,
};
cancellationService.cancelCloneJob = cancelCloneJobSpy;
cancellationService.registerCloneJobToken = registerCloneJobTokenSpy;
const cloneWorker = new CloneWorker(
esQueue as Esqueue,
log,
{} as EsClient,
serverOptions,
{} as IndexWorker,
(repoServiceFactory as any) as RepositoryServiceFactory
(repoServiceFactory as any) as RepositoryServiceFactory,
cancellationService as CancellationSerivce
);
const result1 = await cloneWorker.executeJob({

View file

@ -17,7 +17,6 @@ import { ConsoleLogger } from '../utils/console_logger';
describe('repository service test', () => {
const log = new ConsoleLogger();
const baseDir = fs.mkdtempSync(path.join(os.tmpdir(), 'code_test'));
log.debug(baseDir);
const repoDir = path.join(baseDir, 'repo');
const credsDir = path.join(baseDir, 'credentials');
// @ts-ignore

View file

@ -216,7 +216,8 @@ async function initCodeNode(server: Server, serverOptions: ServerOptions, log: L
esClient,
serverOptions,
indexWorker,
repoServiceFactory
repoServiceFactory,
cancellationService
).bind();
const deleteWorker = new DeleteWorker(
queue,
@ -232,7 +233,8 @@ async function initCodeNode(server: Server, serverOptions: ServerOptions, log: L
log,
esClient,
serverOptions,
repoServiceFactory
repoServiceFactory,
cancellationService
).bind();
// Initialize schedulers.

View file

@ -34,6 +34,10 @@ export abstract class AbstractGitWorker extends AbstractWorker {
}
public async onJobCompleted(job: Job, res: CloneWorkerResult) {
if (res.cancelled) {
// Skip updating job progress if the job is done because of cancellation.
return;
}
await super.onJobCompleted(job, res);
// Update the default branch.
@ -85,9 +89,9 @@ export abstract class AbstractGitWorker extends AbstractWorker {
try {
return await this.objectClient.updateRepositoryGitStatus(uri, p);
} catch (err) {
// This is a warning since it's not blocking anything.
this.log.warn(`Update git clone progress error.`);
this.log.warn(err);
// Do nothing here since it's not blocking anything.
// this.log.warn(`Update git clone progress error.`);
// this.log.warn(err);
}
}
}

View file

@ -106,12 +106,16 @@ export abstract class AbstractWorker implements Worker {
return await this.updateProgress(job, WorkerReservedProgress.INIT);
}
public async onJobCompleted(job: Job, res: any) {
public async onJobCompleted(job: Job, res: WorkerResult) {
this.log.info(
`${this.id} job completed with result ${JSON.stringify(
res
)} in ${this.workerTaskDurationSeconds(job)} seconds.`
);
if (res.cancelled) {
// Skip updating job progress if the job is done because of cancellation.
return;
}
return await this.updateProgress(job, WorkerReservedProgress.COMPLETED);
}

View file

@ -8,13 +8,32 @@ import { RepositoryUri } from '../../model';
import { CancellationToken } from '../lib/esqueue';
export class CancellationSerivce {
// TODO: Add clone/update cancellation map.
private cloneCancellationMap: Map<RepositoryUri, CancellationToken>;
private updateCancellationMap: Map<RepositoryUri, CancellationToken>;
private indexCancellationMap: Map<RepositoryUri, CancellationToken>;
constructor() {
this.cloneCancellationMap = new Map<RepositoryUri, CancellationToken>();
this.updateCancellationMap = new Map<RepositoryUri, CancellationToken>();
this.indexCancellationMap = new Map<RepositoryUri, CancellationToken>();
}
public cancelCloneJob(repoUri: RepositoryUri) {
const token = this.cloneCancellationMap.get(repoUri);
if (token) {
token.cancel();
this.cloneCancellationMap.delete(repoUri);
}
}
public cancelUpdateJob(repoUri: RepositoryUri) {
const token = this.updateCancellationMap.get(repoUri);
if (token) {
token.cancel();
this.updateCancellationMap.delete(repoUri);
}
}
public cancelIndexJob(repoUri: RepositoryUri) {
const token = this.indexCancellationMap.get(repoUri);
if (token) {
@ -23,6 +42,22 @@ export class CancellationSerivce {
}
}
public registerCloneJobToken(repoUri: RepositoryUri, cancellationToken: CancellationToken) {
const token = this.cloneCancellationMap.get(repoUri);
if (token) {
token.cancel();
}
this.cloneCancellationMap.set(repoUri, cancellationToken);
}
public registerUpdateJobToken(repoUri: RepositoryUri, cancellationToken: CancellationToken) {
const token = this.updateCancellationMap.get(repoUri);
if (token) {
token.cancel();
}
this.updateCancellationMap.set(repoUri, cancellationToken);
}
public registerIndexJobToken(repoUri: RepositoryUri, cancellationToken: CancellationToken) {
const token = this.indexCancellationMap.get(repoUri);
if (token) {

View file

@ -19,6 +19,7 @@ import { Logger } from '../log';
import { RepositoryServiceFactory } from '../repository_service_factory';
import { ServerOptions } from '../server_options';
import { AbstractGitWorker } from './abstract_git_worker';
import { CancellationSerivce } from './cancellation_service';
import { IndexWorker } from './index_worker';
import { Job } from './job';
@ -31,13 +32,15 @@ export class CloneWorker extends AbstractGitWorker {
protected readonly client: EsClient,
protected readonly serverOptions: ServerOptions,
private readonly indexWorker: IndexWorker,
private readonly repoServiceFactory: RepositoryServiceFactory
private readonly repoServiceFactory: RepositoryServiceFactory,
private readonly cancellationService: CancellationSerivce
) {
super(queue, log, client, serverOptions);
}
public async executeJob(job: Job) {
const { url } = job.payload;
const { payload, cancellationToken } = job;
const { url } = payload;
try {
validateGitUrl(
url,
@ -62,15 +65,36 @@ export class CloneWorker extends AbstractGitWorker {
this.serverOptions.security.enableGitCertCheck
);
const repo = RepositoryUtils.buildRepository(url);
// Try to cancel any existing clone job for this repository.
this.cancellationService.cancelCloneJob(repo.uri);
let cancelled = false;
if (cancellationToken) {
cancellationToken.on(() => {
cancelled = true;
});
this.cancellationService.registerCloneJobToken(repo.uri, cancellationToken);
}
return await repoService.clone(repo, (progress: number, cloneProgress?: CloneProgress) => {
if (cancelled) {
// return false to stop the clone progress
return false;
}
// For clone job payload, it only has the url. Populate back the
// repository uri before update progress.
job.payload.uri = repo.uri;
this.updateProgress(job, progress, undefined, cloneProgress);
return true;
});
}
public async onJobCompleted(job: Job, res: CloneWorkerResult) {
if (res.cancelled) {
// Skip updating job progress if the job is done because of cancellation.
return;
}
this.log.info(`Clone job done for ${res.repo.uri}`);
// For clone job payload, it only has the url. Populate back the
// repository uri.

View file

@ -41,10 +41,16 @@ test('Execute delete job.', async () => {
// Setup CancellationService
const cancelIndexJobSpy = sinon.spy();
const cancelCloneJobSpy = sinon.spy();
const cancelUpdateJobSpy = sinon.spy();
const cancellationService = {
cancelCloneJob: emptyAsyncFunc,
cancelUpdateJob: emptyAsyncFunc,
cancelIndexJob: emptyAsyncFunc,
};
cancellationService.cancelIndexJob = cancelIndexJobSpy;
cancellationService.cancelCloneJob = cancelCloneJobSpy;
cancellationService.cancelUpdateJob = cancelUpdateJobSpy;
// Setup EsClient
const deleteSpy = sinon.fake.returns(Promise.resolve());
@ -85,6 +91,8 @@ test('Execute delete job.', async () => {
});
expect(cancelIndexJobSpy.calledOnce).toBeTruthy();
expect(cancelCloneJobSpy.calledOnce).toBeTruthy();
expect(cancelUpdateJobSpy.calledOnce).toBeTruthy();
expect(newInstanceSpy.calledOnce).toBeTruthy();
expect(removeSpy.calledOnce).toBeTruthy();

View file

@ -40,7 +40,8 @@ export class DeleteWorker extends AbstractWorker {
const { uri } = job.payload;
// 1. Cancel running workers
// TODO: Add support for clone/update worker.
this.cancellationService.cancelCloneJob(uri);
this.cancellationService.cancelUpdateJob(uri);
this.cancellationService.cancelIndexJob(uri);
// 2. Delete repository on local fs.

View file

@ -84,6 +84,7 @@ export class IndexWorker extends AbstractWorker {
}
// Binding the index cancellation logic
let cancelled = false;
this.cancellationService.cancelIndexJob(uri);
const indexPromises: Array<Promise<IndexStats>> = this.indexerFactories.map(
async (indexerFactory: IndexerFactory, index: number) => {
@ -96,6 +97,7 @@ export class IndexWorker extends AbstractWorker {
if (cancellationToken) {
cancellationToken.on(() => {
indexer.cancel();
cancelled = true;
});
this.cancellationService.registerIndexJobToken(uri, cancellationToken);
}
@ -108,6 +110,7 @@ export class IndexWorker extends AbstractWorker {
uri,
revision,
stats: aggregateIndexStats(stats),
cancelled,
};
this.log.info(`Index worker finished with stats: ${JSON.stringify([...res.stats])}`);
return res;

View file

@ -5,12 +5,14 @@
*/
import sinon from 'sinon';
import { EsClient, Esqueue } from '../lib/esqueue';
import { Logger } from '../log';
import { RepositoryServiceFactory } from '../repository_service_factory';
import { ServerOptions } from '../server_options';
import { emptyAsyncFunc } from '../test_utils';
import { ConsoleLoggerFactory } from '../utils/console_logger_factory';
import { CancellationSerivce } from './cancellation_service';
import { UpdateWorker } from './update_worker';
const log: Logger = new ConsoleLoggerFactory().getLogger(['test']);
@ -37,6 +39,16 @@ test('Execute update job', async () => {
const newInstanceSpy = sinon.fake.returns(repoService);
repoServiceFactory.newInstance = newInstanceSpy;
// Setup CancellationService
const cancelUpdateJobSpy = sinon.spy();
const registerUpdateJobTokenSpy = sinon.spy();
const cancellationService: any = {
cancelUpdateJob: emptyAsyncFunc,
registerUpdateJobToken: emptyAsyncFunc,
};
cancellationService.cancelUpdateJob = cancelUpdateJobSpy;
cancellationService.registerUpdateJobToken = registerUpdateJobTokenSpy;
const updateWorker = new UpdateWorker(
esQueue as Esqueue,
log,
@ -46,7 +58,8 @@ test('Execute update job', async () => {
enableGitCertCheck: false,
},
} as ServerOptions,
(repoServiceFactory as any) as RepositoryServiceFactory
(repoServiceFactory as any) as RepositoryServiceFactory,
cancellationService as CancellationSerivce
);
await updateWorker.executeJob({

View file

@ -10,6 +10,7 @@ import { Logger } from '../log';
import { RepositoryServiceFactory } from '../repository_service_factory';
import { ServerOptions } from '../server_options';
import { AbstractGitWorker } from './abstract_git_worker';
import { CancellationSerivce } from './cancellation_service';
import { Job } from './job';
export class UpdateWorker extends AbstractGitWorker {
@ -20,13 +21,15 @@ export class UpdateWorker extends AbstractGitWorker {
protected readonly log: Logger,
protected readonly client: EsClient,
protected readonly serverOptions: ServerOptions,
protected readonly repoServiceFactory: RepositoryServiceFactory
protected readonly repoServiceFactory: RepositoryServiceFactory,
private readonly cancellationService: CancellationSerivce
) {
super(queue, log, client, serverOptions);
}
public async executeJob(job: Job) {
const repo: Repository = job.payload;
const { payload, cancellationToken } = job;
const repo: Repository = payload;
this.log.info(`Execute update job for ${repo.uri}`);
const repoService = this.repoServiceFactory.newInstance(
this.serverOptions.repoPath,
@ -34,7 +37,25 @@ export class UpdateWorker extends AbstractGitWorker {
this.log,
this.serverOptions.security.enableGitCertCheck
);
return await repoService.update(repo);
// Try to cancel any existing update job for this repository.
this.cancellationService.cancelUpdateJob(repo.uri);
let cancelled = false;
if (cancellationToken) {
cancellationToken.on(() => {
cancelled = true;
});
this.cancellationService.registerUpdateJobToken(repo.uri, cancellationToken);
}
return await repoService.update(repo, () => {
if (cancelled) {
// return false to stop the clone progress
return false;
}
return true;
});
}
public async onJobCompleted(job: Job, res: CloneWorkerResult) {

View file

@ -8,7 +8,9 @@ import Git, { RemoteCallbacks } from '@elastic/nodegit';
import del from 'del';
import fs from 'fs';
import mkdirp from 'mkdirp';
import moment from 'moment';
import path from 'path';
import { RepositoryUtils } from '../common/repository_utils';
import {
CloneProgress,
@ -19,10 +21,24 @@ import {
} from '../model';
import { Logger } from './log';
export type CloneProgressHandler = (progress: number, cloneProgress?: CloneProgress) => void;
// Return false to stop the clone progress. Return true to keep going;
export type CloneProgressHandler = (progress: number, cloneProgress?: CloneProgress) => boolean;
export type UpdateProgressHandler = () => boolean;
const GIT_FETCH_PROGRESS_CANCEL = -1;
// TODO: Cannot directly access Git.Error.CODE.EUSER (-7). Investigate why.
const NODEGIT_CALLBACK_RETURN_VALUE_ERROR = -7;
const GIT_INDEXER_PROGRESS_CALLBACK_RETURN_VALUE_ERROR_MSG = `indexer progress callback returned ${GIT_FETCH_PROGRESS_CANCEL}`;
const SSH_AUTH_ERROR = new Error('Failed to authenticate SSH session');
function isCancelled(error: any) {
return (
error &&
(error.message.includes(GIT_INDEXER_PROGRESS_CALLBACK_RETURN_VALUE_ERROR_MSG) ||
error.errno === NODEGIT_CALLBACK_RETURN_VALUE_ERROR)
);
}
// This is the service for any kind of repository handling, e.g. clone, update, delete, etc.
export class RepositoryService {
constructor(
@ -92,18 +108,34 @@ export class RepositoryService {
throw error;
}
}
public async update(repo: Repository): Promise<UpdateWorkerResult> {
public async update(
repo: Repository,
handler?: UpdateProgressHandler
): Promise<UpdateWorkerResult> {
if (repo.protocol === 'ssh') {
return await this.tryWithKeys(key => this.doUpdate(repo.uri, key));
return await this.tryWithKeys(key => this.doUpdate(repo.uri, key, handler));
} else {
return await this.doUpdate(repo.uri);
return await this.doUpdate(repo.uri, /* key */ undefined, handler);
}
}
public async doUpdate(uri: string, key?: string): Promise<UpdateWorkerResult> {
public async doUpdate(
uri: string,
key?: string,
handler?: UpdateProgressHandler
): Promise<UpdateWorkerResult> {
const localPath = RepositoryUtils.repositoryLocalPath(this.repoVolPath, uri);
try {
const repo = await Git.Repository.open(localPath);
const cbs: RemoteCallbacks = {
transferProgress: (_: any) => {
if (handler) {
const resumeUpdate = handler();
if (!resumeUpdate) {
return GIT_FETCH_PROGRESS_CANCEL;
}
}
return 0;
},
credentials: this.credentialFunc(key),
};
// Ignore cert check on testing environment.
@ -133,7 +165,19 @@ export class RepositoryService {
revision: headCommit.sha(),
};
} catch (error) {
if (error.message && error.message.startsWith(SSH_AUTH_ERROR.message)) {
if (isCancelled(error)) {
// Update job was cancelled intentionally. Do not throw this error.
this.log.info(`Update repository job for ${uri} was cancelled.`);
this.log.debug(
`Update repository job cancellation error: ${JSON.stringify(error, null, 2)}`
);
return {
uri,
branch: '',
revision: '',
cancelled: true,
};
} else if (error.message && error.message.startsWith(SSH_AUTH_ERROR.message)) {
throw SSH_AUTH_ERROR;
} else {
const msg = `update repository ${uri} error: ${error}`;
@ -177,30 +221,37 @@ export class RepositoryService {
keyFile?: string
) {
try {
let lastProgressUpdate = moment();
const cbs: RemoteCallbacks = {
transferProgress: {
// Make the progress update less frequent to avoid too many
// concurrently update of git status in elasticsearch.
throttle: 1000,
callback: (stats: any) => {
if (handler) {
const progress =
(100 * (stats.receivedObjects() + stats.indexedObjects())) /
(stats.totalObjects() * 2);
const cloneProgress = {
isCloned: false,
receivedObjects: stats.receivedObjects(),
indexedObjects: stats.indexedObjects(),
totalObjects: stats.totalObjects(),
localObjects: stats.localObjects(),
totalDeltas: stats.totalDeltas(),
indexedDeltas: stats.indexedDeltas(),
receivedBytes: stats.receivedBytes(),
};
handler(progress, cloneProgress);
transferProgress: (stats: any) => {
// Clone progress update throttling.
const now = moment();
if (now.diff(lastProgressUpdate) < 1000) {
return 0;
}
lastProgressUpdate = now;
if (handler) {
const progress =
(100 * (stats.receivedObjects() + stats.indexedObjects())) /
(stats.totalObjects() * 2);
const cloneProgress = {
isCloned: false,
receivedObjects: stats.receivedObjects(),
indexedObjects: stats.indexedObjects(),
totalObjects: stats.totalObjects(),
localObjects: stats.localObjects(),
totalDeltas: stats.totalDeltas(),
indexedDeltas: stats.indexedDeltas(),
receivedBytes: stats.receivedBytes(),
};
const resumeClone = handler(progress, cloneProgress);
if (!resumeClone) {
return GIT_FETCH_PROGRESS_CANCEL;
}
},
} as any,
}
return 0;
},
credentials: this.credentialFunc(keyFile),
};
// Ignore cert check on testing environment.
@ -234,7 +285,18 @@ export class RepositoryService {
},
};
} catch (error) {
if (error.message && error.message.startsWith(SSH_AUTH_ERROR.message)) {
if (isCancelled(error)) {
// Clone job was cancelled intentionally. Do not throw this error.
this.log.info(`Clone repository job for ${repo.uri} was cancelled.`);
this.log.debug(
`Clone repository job cancellation error: ${JSON.stringify(error, null, 2)}`
);
return {
uri: repo.uri,
repo,
cancelled: true,
};
} else if (error.message && error.message.startsWith(SSH_AUTH_ERROR.message)) {
throw SSH_AUTH_ERROR;
} else {
const msg = `Clone repository from ${repo.url} error.`;