mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 09:19:04 -04:00
* [ML] Start datafeeds from the module setup endpoint * changes based on review * correcting use of filter
This commit is contained in:
parent
4f29b323bf
commit
de11c30ed4
2 changed files with 115 additions and 10 deletions
|
@ -191,7 +191,17 @@ export class DataRecognizer {
|
|||
// takes a module config id, an optional jobPrefix and the request object
|
||||
// creates all of the jobs, datafeeds and savedObjects listed in the module config.
|
||||
// if any of the savedObjects already exist, they will not be overwritten.
|
||||
async setupModuleItems(moduleId, jobPrefix, groups, indexPatternName, query, request) {
|
||||
async setupModuleItems(
|
||||
moduleId,
|
||||
jobPrefix,
|
||||
groups,
|
||||
indexPatternName,
|
||||
query,
|
||||
startDatafeed,
|
||||
start,
|
||||
end,
|
||||
request
|
||||
) {
|
||||
this.savedObjectsClient = request.getSavedObjectsClient();
|
||||
this.indexPatterns = await this.loadIndexPatterns();
|
||||
|
||||
|
@ -235,6 +245,24 @@ export class DataRecognizer {
|
|||
});
|
||||
}
|
||||
saveResults.datafeeds = await this.saveDatafeeds(moduleConfig.datafeeds);
|
||||
|
||||
if (startDatafeed) {
|
||||
const savedDatafeeds = moduleConfig.datafeeds.filter((df) => {
|
||||
const datafeedResult = saveResults.datafeeds.find(d => d.id === df.id);
|
||||
return (datafeedResult !== undefined && datafeedResult.success === true);
|
||||
});
|
||||
|
||||
const startResults = await this.startDatafeeds(savedDatafeeds, start, end);
|
||||
saveResults.datafeeds.forEach((df) => {
|
||||
const startedDatafeed = startResults[df.id];
|
||||
if (startedDatafeed !== undefined) {
|
||||
df.started = startedDatafeed.started;
|
||||
if (startedDatafeed.error !== undefined) {
|
||||
df.error = startedDatafeed.error;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// create the savedObjects
|
||||
|
@ -367,14 +395,11 @@ export class DataRecognizer {
|
|||
// as success: false
|
||||
async saveDatafeeds(datafeeds) {
|
||||
return await Promise.all(datafeeds.map(async (datafeed) => {
|
||||
const datafeedId = datafeed.id;
|
||||
|
||||
try {
|
||||
datafeed.id = datafeedId;
|
||||
await this.saveDatafeed(datafeed);
|
||||
return { id: datafeedId, success: true };
|
||||
return { id: datafeed.id, success: true, started: false };
|
||||
} catch (error) {
|
||||
return { id: datafeedId, success: false, error };
|
||||
return { id: datafeed.id, success: false, started: false, error };
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
@ -384,6 +409,51 @@ export class DataRecognizer {
|
|||
return this.callWithRequest('ml.addDatafeed', { datafeedId, body });
|
||||
}
|
||||
|
||||
async startDatafeeds(datafeeds, start, end) {
|
||||
const results = {};
|
||||
for (const datafeed of datafeeds) {
|
||||
results[datafeed.id] = await this.startDatafeed(datafeed, start, end);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
async startDatafeed(datafeed, start, end) {
|
||||
const result = { started: false };
|
||||
let opened = false;
|
||||
try {
|
||||
const openResult = await this.callWithRequest('ml.openJob', { jobId: datafeed.config.job_id });
|
||||
opened = openResult.opened;
|
||||
} catch (error) {
|
||||
// if the job is already open, a 409 will be returned.
|
||||
if (error.statusCode === 409) {
|
||||
opened = true;
|
||||
} else {
|
||||
opened = false;
|
||||
result.started = false;
|
||||
result.error = error;
|
||||
}
|
||||
}
|
||||
if (opened) {
|
||||
try {
|
||||
const duration = { start: 0 };
|
||||
if (start !== undefined) {
|
||||
duration.start = start;
|
||||
}
|
||||
if (end !== undefined) {
|
||||
duration.end = end;
|
||||
}
|
||||
|
||||
await this.callWithRequest('ml.startDatafeed', { datafeedId: datafeed.id, ...duration });
|
||||
result.started = true;
|
||||
} catch (error) {
|
||||
result.started = false;
|
||||
result.error = error;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
// merge all of the save results into one result object
|
||||
// which is returned from the endpoint
|
||||
async updateResults(results, saveResults) {
|
||||
|
@ -404,6 +474,7 @@ export class DataRecognizer {
|
|||
saveResults.datafeeds.forEach((d2) => {
|
||||
if (d.id === d2.id) {
|
||||
d.success = d2.success;
|
||||
d.started = d2.started;
|
||||
if (d2.error !== undefined) {
|
||||
d.error = d2.error;
|
||||
}
|
||||
|
|
|
@ -21,9 +21,29 @@ function getModule(callWithRequest, moduleId) {
|
|||
return dr.getModule(moduleId);
|
||||
}
|
||||
|
||||
function saveModuleItems(callWithRequest, moduleId, prefix, groups, indexPatternName, query, request) {
|
||||
function saveModuleItems(
|
||||
callWithRequest,
|
||||
moduleId,
|
||||
prefix,
|
||||
groups,
|
||||
indexPatternName,
|
||||
query,
|
||||
startDatafeed,
|
||||
start,
|
||||
end,
|
||||
request
|
||||
) {
|
||||
const dr = new DataRecognizer(callWithRequest);
|
||||
return dr.setupModuleItems(moduleId, prefix, groups, indexPatternName, query, request);
|
||||
return dr.setupModuleItems(
|
||||
moduleId,
|
||||
prefix,
|
||||
groups,
|
||||
indexPatternName,
|
||||
query,
|
||||
startDatafeed,
|
||||
start,
|
||||
end,
|
||||
request);
|
||||
}
|
||||
|
||||
export function dataRecognizer(server, commonRouteConfig) {
|
||||
|
@ -69,10 +89,24 @@ export function dataRecognizer(server, commonRouteConfig) {
|
|||
prefix,
|
||||
groups,
|
||||
indexPatternName,
|
||||
query
|
||||
query,
|
||||
startDatafeed,
|
||||
start,
|
||||
end
|
||||
} = request.payload;
|
||||
|
||||
return saveModuleItems(callWithRequest, moduleId, prefix, groups, indexPatternName, query, request)
|
||||
return saveModuleItems(
|
||||
callWithRequest,
|
||||
moduleId,
|
||||
prefix,
|
||||
groups,
|
||||
indexPatternName,
|
||||
query,
|
||||
startDatafeed,
|
||||
start,
|
||||
end,
|
||||
request
|
||||
)
|
||||
.then(resp => reply(resp))
|
||||
.catch(resp => reply(wrapError(resp)));
|
||||
},
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue