mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 15:47:23 -04:00
This commit allows for composite aggregations in datafeeds. Composite aggs provide a much better solution for having influencers, partitions, etc. on high volume data. Instead of worrying about long scrolls in the datafeed, the calculation is distributed across cluster via the aggregations. The restrictions for this support are as follows: - The composite aggregation must have EXACTLY one `date_histogram` source - The sub-aggs of the composite aggregation must have a `max` aggregation on the SAME timefield as the aforementioned `date_histogram` source - The composite agg must be the ONLY top level agg and it cannot have a `composite` or `date_histogram` sub-agg - If using a `date_histogram` to bucket time, it cannot have a `composite` sub-agg. - The top-level `composite` agg cannot have a sibling pipeline agg. Pipeline aggregations are supported as a sub-agg (thus a pipeline agg INSIDE the bucket). Some key user interaction differences: - Speed + resources used by the cluster should be controlled by the `size` parameter in the `composite` aggregation. Previously, we said if you are using aggs, use a specific `chunking_config`. But, with composite, that is not necessary. - Users really shouldn't use nested `terms` aggs anylonger. While this is still a "valid" configuration and MAY be desirable for some users (only wanting the top 10 of certain terms), typically when users want influencers, partition fields, etc. they want the ENTIRE population. Previously, this really wasn't possible with aggs, with `composite` it is. - I cannot really think of a typical usecase that SHOULD ever use a multi-bucket aggregation that is NOT supported by composite.
490 lines
16 KiB
Text
490 lines
16 KiB
Text
[role="xpack"]
|
|
[[ml-configuring-aggregation]]
|
|
= Aggregating data for faster performance
|
|
|
|
By default, {dfeeds} fetch data from {es} using search and scroll requests.
|
|
It can be significantly more efficient, however, to aggregate data in {es}
|
|
and to configure your {anomaly-jobs} to analyze aggregated data.
|
|
|
|
One of the benefits of aggregating data this way is that {es} automatically
|
|
distributes these calculations across your cluster. You can then feed this
|
|
aggregated data into the {ml-features} instead of raw results, which
|
|
reduces the volume of data that must be considered while detecting anomalies.
|
|
|
|
TIP: If you use a terms aggregation and the cardinality of a term is high but
|
|
still significantly less than your total number of documents, use
|
|
{ref}/search-aggregations-bucket-composite-aggregation.html[composite aggregations]
|
|
experimental:[Support for composite aggregations inside datafeeds is currently experimental].
|
|
|
|
[discrete]
|
|
[[aggs-limits-dfeeds]]
|
|
== Requirements and limitations
|
|
|
|
There are some limitations to using aggregations in {dfeeds}.
|
|
|
|
Your aggregation must include a `date_histogram` aggregation or a top level `composite` aggregation,
|
|
which in turn must contain a `max` aggregation on the time field.
|
|
This requirement ensures that the aggregated data is a time series and the timestamp
|
|
of each bucket is the time of the last record in the bucket.
|
|
|
|
IMPORTANT: The name of the aggregation and the name of the field that it
|
|
operates on need to match, otherwise the aggregation doesn't work. For example,
|
|
if you use a `max` aggregation on a time field called `responsetime`, the name
|
|
of the aggregation must be also `responsetime`.
|
|
|
|
You must consider the interval of the `date_histogram` or `composite`
|
|
aggregation carefully. The bucket span of your {anomaly-job} must be divisible
|
|
by the value of the `calendar_interval` or `fixed_interval` in your aggregation
|
|
(with no remainder). If you specify a `frequency` for your {dfeed},
|
|
it must also be divisible by this interval. {anomaly-jobs-cap} cannot use
|
|
`date_histogram` or `composite` aggregations with an interval measured in months
|
|
because the length of the month is not fixed; they can use weeks or smaller units.
|
|
|
|
TIP: As a rule of thumb, if your detectors use <<ml-metric-functions,metric>> or
|
|
<<ml-sum-functions,sum>> analytical functions, set the `date_histogram` or `composite`
|
|
aggregation interval to a tenth of the bucket span. This suggestion creates
|
|
finer, more granular time buckets, which are ideal for this type of analysis. If
|
|
your detectors use <<ml-count-functions,count>> or <<ml-rare-functions,rare>>
|
|
functions, set the interval to the same value as the bucket span.
|
|
|
|
If your <<aggs-dfeeds,{dfeed} uses aggregations with nested `terms` aggs>> and
|
|
model plot is not enabled for the {anomaly-job}, neither the **Single Metric
|
|
Viewer** nor the **Anomaly Explorer** can plot and display an anomaly
|
|
chart for the job. In these cases, the charts are not visible and an explanatory
|
|
message is shown.
|
|
|
|
When the aggregation interval of the {dfeed} and the bucket span of the job
|
|
don't match, the values of the chart plotted in both the **Single Metric
|
|
Viewer** and the **Anomaly Explorer** differ from the actual values of the job.
|
|
To avoid this behavior, make sure that the aggregation interval in the {dfeed}
|
|
configuration and the bucket span in the {anomaly-job} configuration have the
|
|
same values.
|
|
|
|
Your {dfeed} can contain multiple aggregations, but only the ones with names
|
|
that match values in the job configuration are fed to the job.
|
|
|
|
[discrete]
|
|
[[aggs-using-date-histogram]]
|
|
=== Including aggregations in {anomaly-jobs}
|
|
|
|
When you create or update an {anomaly-job}, you can include the names of
|
|
aggregations, for example:
|
|
|
|
[source,console]
|
|
----------------------------------
|
|
PUT _ml/anomaly_detectors/farequote
|
|
{
|
|
"analysis_config": {
|
|
"bucket_span": "60m",
|
|
"detectors": [{
|
|
"function": "mean",
|
|
"field_name": "responsetime", <1>
|
|
"by_field_name": "airline" <1>
|
|
}],
|
|
"summary_count_field_name": "doc_count"
|
|
},
|
|
"data_description": {
|
|
"time_field":"time" <1>
|
|
}
|
|
}
|
|
----------------------------------
|
|
// TEST[skip:setup:farequote_data]
|
|
|
|
<1> The `airline`, `responsetime`, and `time` fields are aggregations. Only the
|
|
aggregated fields defined in the `analysis_config` object are analyzed by the
|
|
{anomaly-job}.
|
|
|
|
NOTE: When the `summary_count_field_name` property is set to a non-null value,
|
|
the job expects to receive aggregated input. The property must be set to the
|
|
name of the field that contains the count of raw data points that have been
|
|
aggregated. It applies to all detectors in the job.
|
|
|
|
The aggregations are defined in the {dfeed} as follows:
|
|
|
|
[source,console]
|
|
----------------------------------
|
|
PUT _ml/datafeeds/datafeed-farequote
|
|
{
|
|
"job_id":"farequote",
|
|
"indices": ["farequote"],
|
|
"aggregations": {
|
|
"buckets": {
|
|
"date_histogram": {
|
|
"field": "time",
|
|
"fixed_interval": "360s",
|
|
"time_zone": "UTC"
|
|
},
|
|
"aggregations": {
|
|
"time": { <1>
|
|
"max": {"field": "time"}
|
|
},
|
|
"airline": { <2>
|
|
"terms": {
|
|
"field": "airline",
|
|
"size": 100
|
|
},
|
|
"aggregations": {
|
|
"responsetime": { <3>
|
|
"avg": {
|
|
"field": "responsetime"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
----------------------------------
|
|
// TEST[skip:setup:farequote_job]
|
|
|
|
<1> The aggregations have names that match the fields that they operate on. The
|
|
`max` aggregation is named `time` and its field also needs to be `time`.
|
|
<2> The `term` aggregation is named `airline` and its field is also named
|
|
`airline`.
|
|
<3> The `avg` aggregation is named `responsetime` and its field is also named
|
|
`responsetime`.
|
|
|
|
TIP: If you are using a `term` aggregation to gather influencer or partition
|
|
field information, consider using a `composite` aggregation. It performs
|
|
better than a `date_histogram` with a nested `term` aggregation and also includes
|
|
all the values of the field instead of the top values per bucket.
|
|
|
|
[discrete]
|
|
[[aggs-using-composite]]
|
|
=== Using composite aggregations in {anomaly-jobs}
|
|
|
|
experimental::[]
|
|
|
|
For `composite` aggregation support, there must be exactly one `date_histogram` value
|
|
source. That value source must not be sorted in descending order. Additional
|
|
`composite` aggregation value sources are allowed, such as `terms`.
|
|
|
|
NOTE: A {dfeed} that uses composite aggregations may not be as performant as datafeeds that use scrolling or
|
|
date histogram aggregations. Composite aggregations are optimized
|
|
for queries that are either `match_all` or `range` filters. Other types of
|
|
queries may cause the `composite` aggregation to be ineffecient.
|
|
|
|
Here is an example that uses a `composite` aggregation instead of a
|
|
`date_histogram`.
|
|
|
|
Assuming the same job configuration as above.
|
|
|
|
[source,console]
|
|
----------------------------------
|
|
PUT _ml/anomaly_detectors/farequote-composite
|
|
{
|
|
"analysis_config": {
|
|
"bucket_span": "60m",
|
|
"detectors": [{
|
|
"function": "mean",
|
|
"field_name": "responsetime",
|
|
"by_field_name": "airline"
|
|
}],
|
|
"summary_count_field_name": "doc_count"
|
|
},
|
|
"data_description": {
|
|
"time_field":"time"
|
|
}
|
|
}
|
|
----------------------------------
|
|
// TEST[skip:setup:farequote_data]
|
|
|
|
This is an example of a datafeed that uses a `composite` aggregation to bucket
|
|
the metrics based on time and terms:
|
|
|
|
[source,console]
|
|
----------------------------------
|
|
PUT _ml/datafeeds/datafeed-farequote-composite
|
|
{
|
|
"job_id": "farequote-composite",
|
|
"indices": [
|
|
"farequote"
|
|
],
|
|
"aggregations": {
|
|
"buckets": {
|
|
"composite": {
|
|
"size": 1000, <1>
|
|
"sources": [
|
|
{
|
|
"time_bucket": { <2>
|
|
"date_histogram": {
|
|
"field": "time",
|
|
"fixed_interval": "360s",
|
|
"time_zone": "UTC"
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"airline": { <3>
|
|
"terms": {
|
|
"field": "airline"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
},
|
|
"aggregations": {
|
|
"time": { <4>
|
|
"max": {
|
|
"field": "time"
|
|
}
|
|
},
|
|
"responsetime": { <5>
|
|
"avg": {
|
|
"field": "responsetime"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
----------------------------------
|
|
// TEST[skip:setup:farequote_job]
|
|
|
|
<1> Provide the `size` to the composite agg to control how many resources
|
|
are used when aggregating the data. A larger `size` means a faster datafeed but
|
|
more cluster resources are used when searching.
|
|
<2> The required `date_histogram` composite aggregation source. Make sure it
|
|
is named differently than your desired time field.
|
|
<3> Instead of using a regular `term` aggregation, adding a composite
|
|
aggregation `term` source with the name `airline` works. Note its name
|
|
is the same as the field.
|
|
<4> The required `max` aggregation whose name is the time field in the
|
|
job analysis config.
|
|
<5> The `avg` aggregation is named `responsetime` and its field is also named
|
|
`responsetime`.
|
|
|
|
[discrete]
|
|
[[aggs-dfeeds]]
|
|
== Nested aggregations in {dfeeds}
|
|
|
|
{dfeeds-cap} support complex nested aggregations. This example uses the
|
|
`derivative` pipeline aggregation to find the first order derivative of the
|
|
counter `system.network.out.bytes` for each value of the field `beat.name`.
|
|
|
|
NOTE: `derivative` or other pipeline aggregations may not work within `composite`
|
|
aggregations. See
|
|
{ref}/search-aggregations-bucket-composite-aggregation.html#search-aggregations-bucket-composite-aggregation-pipeline-aggregations[composite aggregations and pipeline aggregations].
|
|
|
|
[source,js]
|
|
----------------------------------
|
|
"aggregations": {
|
|
"beat.name": {
|
|
"terms": {
|
|
"field": "beat.name"
|
|
},
|
|
"aggregations": {
|
|
"buckets": {
|
|
"date_histogram": {
|
|
"field": "@timestamp",
|
|
"fixed_interval": "5m"
|
|
},
|
|
"aggregations": {
|
|
"@timestamp": {
|
|
"max": {
|
|
"field": "@timestamp"
|
|
}
|
|
},
|
|
"bytes_out_average": {
|
|
"avg": {
|
|
"field": "system.network.out.bytes"
|
|
}
|
|
},
|
|
"bytes_out_derivative": {
|
|
"derivative": {
|
|
"buckets_path": "bytes_out_average"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
----------------------------------
|
|
// NOTCONSOLE
|
|
|
|
|
|
[discrete]
|
|
[[aggs-single-dfeeds]]
|
|
== Single bucket aggregations in {dfeeds}
|
|
|
|
{dfeeds-cap} not only supports multi-bucket aggregations, but also single bucket
|
|
aggregations. The following shows two `filter` aggregations, each gathering the
|
|
number of unique entries for the `error` field.
|
|
|
|
[source,js]
|
|
----------------------------------
|
|
{
|
|
"job_id":"servers-unique-errors",
|
|
"indices": ["logs-*"],
|
|
"aggregations": {
|
|
"buckets": {
|
|
"date_histogram": {
|
|
"field": "time",
|
|
"interval": "360s",
|
|
"time_zone": "UTC"
|
|
},
|
|
"aggregations": {
|
|
"time": {
|
|
"max": {"field": "time"}
|
|
}
|
|
"server1": {
|
|
"filter": {"term": {"source": "server-name-1"}},
|
|
"aggregations": {
|
|
"server1_error_count": {
|
|
"value_count": {
|
|
"field": "error"
|
|
}
|
|
}
|
|
}
|
|
},
|
|
"server2": {
|
|
"filter": {"term": {"source": "server-name-2"}},
|
|
"aggregations": {
|
|
"server2_error_count": {
|
|
"value_count": {
|
|
"field": "error"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
----------------------------------
|
|
// NOTCONSOLE
|
|
|
|
|
|
[discrete]
|
|
[[aggs-define-dfeeds]]
|
|
== Defining aggregations in {dfeeds}
|
|
|
|
When you define an aggregation in a {dfeed}, it must have one of the following forms:
|
|
|
|
When using a `date_histogram` aggregation to bucket by time:
|
|
[source,js]
|
|
----------------------------------
|
|
"aggregations": {
|
|
["bucketing_aggregation": {
|
|
"bucket_agg": {
|
|
...
|
|
},
|
|
"aggregations": {]
|
|
"data_histogram_aggregation": {
|
|
"date_histogram": {
|
|
"field": "time",
|
|
},
|
|
"aggregations": {
|
|
"timestamp": {
|
|
"max": {
|
|
"field": "time"
|
|
}
|
|
},
|
|
[,"<first_term>": {
|
|
"terms":{...
|
|
}
|
|
[,"aggregations" : {
|
|
[<sub_aggregation>]+
|
|
} ]
|
|
}]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
----------------------------------
|
|
// NOTCONSOLE
|
|
|
|
When using a `composite` aggregation:
|
|
|
|
[source,js]
|
|
----------------------------------
|
|
"aggregations": {
|
|
"composite_agg": {
|
|
"sources": [
|
|
{
|
|
"date_histogram_agg": {
|
|
"field": "time",
|
|
...settings...
|
|
}
|
|
},
|
|
...other valid sources...
|
|
],
|
|
...composite agg settings...,
|
|
"aggregations": {
|
|
"timestamp": {
|
|
"max": {
|
|
"field": "time"
|
|
}
|
|
},
|
|
...other aggregations...
|
|
[
|
|
[,"aggregations" : {
|
|
[<sub_aggregation>]+
|
|
} ]
|
|
}]
|
|
}
|
|
}
|
|
}
|
|
----------------------------------
|
|
// NOTCONSOLE
|
|
|
|
The top level aggregation must be exclusively one of the following:
|
|
* A {ref}/search-aggregations-bucket.html[bucket aggregation] containing a single
|
|
sub-aggregation that is a `date_histogram`
|
|
* A top level aggregation that is a `date_histogram`
|
|
* A top level aggregation is a `composite` aggregation.
|
|
|
|
There must be exactly one `date_histogram`, `composite` aggregation. For more information, see
|
|
{ref}/search-aggregations-bucket-datehistogram-aggregation.html[Date histogram aggregation] and
|
|
{ref}/search-aggregations-bucket-composite-aggregation.html[Composite aggregation].
|
|
|
|
NOTE: The `time_zone` parameter in the date histogram aggregation must be set to
|
|
`UTC`, which is the default value.
|
|
|
|
Each histogram or composite bucket has a key, which is the bucket start time.
|
|
This key cannot be used for aggregations in {dfeeds}, however, because
|
|
they need to know the time of the latest record within a bucket.
|
|
Otherwise, when you restart a {dfeed}, it continues from the start time of the
|
|
histogram or composite bucket and possibly fetches the same data twice.
|
|
The max aggregation for the time field is therefore necessary to provide
|
|
the time of the latest record within a bucket.
|
|
|
|
You can optionally specify a terms aggregation, which creates buckets for
|
|
different values of a field.
|
|
|
|
IMPORTANT: If you use a terms aggregation, by default it returns buckets for
|
|
the top ten terms. Thus if the cardinality of the term is greater than 10, not
|
|
all terms are analyzed. In this case, consider using `composite` aggregations
|
|
experimental:[Support for composite aggregations inside datafeeds is currently experimental].
|
|
|
|
You can change this behavior by setting the `size` parameter. To
|
|
determine the cardinality of your data, you can run searches such as:
|
|
|
|
[source,js]
|
|
--------------------------------------------------
|
|
GET .../_search
|
|
{
|
|
"aggs": {
|
|
"service_cardinality": {
|
|
"cardinality": {
|
|
"field": "service"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
--------------------------------------------------
|
|
// NOTCONSOLE
|
|
|
|
|
|
By default, {es} limits the maximum number of terms returned to 10000. For high
|
|
cardinality fields, the query might not run. It might return errors related to
|
|
circuit breaking exceptions that indicate that the data is too large. In such
|
|
cases, use `composite` aggregations in your {dfeed}. For more information, see
|
|
{ref}/search-aggregations-bucket-terms-aggregation.html[Terms aggregation].
|
|
|
|
You can also optionally specify multiple sub-aggregations. The sub-aggregations
|
|
are aggregated for the buckets that were created by their parent aggregation.
|
|
For more information, see {ref}/search-aggregations.html[Aggregations].
|