[Code] refactored the queue to process tasks belong to local node only. (#42708) (#42901)

1. added a `isResourceLocal` method to the CodeServices to check whether
the resource belongs to the local node according to the routing table.
2. each worker only processes jobs that belong to their node.
This commit is contained in:
Yang Yang 2019-08-08 10:54:31 +08:00 committed by GitHub
parent 5f4cc409f3
commit bb6d7f482e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 36 additions and 6 deletions

View file

@ -28,6 +28,10 @@ export class CodeServices {
return this.adapter.locator.locate(req, resource);
}
public isResourceLocal(resource: string): Promise<boolean> {
return this.adapter.locator.isResourceLocal(resource);
}
public serviceFor<def extends ServiceDefinition>(serviceDefinition: def): ServiceMethodMap<def> {
return this.adapter.getService(serviceDefinition);
}

View file

@ -44,5 +44,9 @@ export class LocalHandlerAdapter implements ServiceHandlerAdapter {
async locate(httpRequest: Request, resource: string): Promise<Endpoint> {
return Promise.resolve(new LocalEndpoint(httpRequest, resource));
},
isResourceLocal(resource: string): Promise<boolean> {
return Promise.resolve(true);
},
};
}

View file

@ -37,6 +37,10 @@ export class CodeNodeAdapter implements ServiceHandlerAdapter {
async locate(httpRequest: Request, resource: string): Promise<Endpoint> {
return Promise.resolve(new LocalEndpoint(httpRequest, resource));
},
isResourceLocal(resource: string): Promise<boolean> {
return Promise.resolve(false);
},
};
getService<def extends ServiceDefinition>(serviceDefinition: def): ServiceMethodMap<def> {

View file

@ -14,4 +14,8 @@ export class CodeNodeResourceLocator implements ResourceLocator {
async locate(httpRequest: Request, resource: string): Promise<Endpoint> {
return Promise.resolve(new CodeNodeEndpoint(httpRequest, resource, this.codeNodeUrl));
}
isResourceLocal(resource: string): Promise<boolean> {
return Promise.resolve(false);
}
}

View file

@ -13,4 +13,11 @@ export interface Endpoint {
export interface ResourceLocator {
locate(req: Request, resource: string): Promise<Endpoint>;
/**
* Returns whether the resource resides on the local node. This should support both url and uri of the repository.
*
* @param resource the name of the resource.
*/
isResourceLocal(resource: string): Promise<boolean>;
}

View file

@ -40,7 +40,7 @@ export function initWorkers(
[lspIndexerFactory],
gitOps,
cancellationService
).bind();
).bind(codeServices);
const repoServiceFactory: RepositoryServiceFactory = new RepositoryServiceFactory();
@ -58,7 +58,7 @@ export function initWorkers(
repoServiceFactory,
cancellationService,
watermarkService
).bind();
).bind(codeServices);
const deleteWorker = new DeleteWorker(
queue,
log,
@ -68,7 +68,7 @@ export function initWorkers(
cancellationService,
lspService,
repoServiceFactory
).bind();
).bind(codeServices);
const updateWorker = new UpdateWorker(
queue,
log,
@ -78,7 +78,7 @@ export function initWorkers(
repoServiceFactory,
cancellationService,
watermarkService
).bind();
).bind(codeServices);
codeServices.registerHandler(
RepositoryServiceDefinition,
getRepositoryHandler(cloneWorker, deleteWorker, indexWorker)

View file

@ -36,6 +36,7 @@ export class Worker extends events.EventEmitter {
this.id = puid.generate();
this.queue = queue;
this.client = opts.client || this.queue.client;
this.codeServices = opts.codeServices;
this.jobtype = type;
this.workerFn = workerFn;
this.checkSize = opts.size || 10;
@ -465,7 +466,11 @@ export class Worker extends events.EventEmitter {
if (jobs.length > 0) {
this.debug(`${jobs.length} outstanding jobs returned`);
}
return jobs;
return jobs.filter(async job => {
const payload = job._source.payload.payload;
const repoUrl = payload.uri || payload.url;
return await this.codeServices.isResourceLocal(repoUrl);
});
})
.catch((err) => {
// ignore missing indices errors

View file

@ -16,6 +16,7 @@ import {
import { Logger } from '../log';
import { Job } from './job';
import { Worker } from './worker';
import { CodeServices } from '../distributed/code_services';
export abstract class AbstractWorker implements Worker {
// The id of the worker. Also serves as the id of the job this worker consumes.
@ -69,7 +70,7 @@ export abstract class AbstractWorker implements Worker {
});
}
public bind() {
public bind(codeServices: CodeServices) {
const workerFn = (payload: any, cancellationToken: CancellationToken) => {
const job: Job = {
...payload,
@ -82,6 +83,7 @@ export abstract class AbstractWorker implements Worker {
interval: 5000,
capacity: 5,
intervalErrorMultiplier: 1,
codeServices,
};
const queueWorker = this.queue.registerWorker(this.id, workerFn as any, workerOptions);