Merge main into multi-project

This commit is contained in:
Yang Wang 2024-12-22 19:36:33 +11:00
commit e790688377
284 changed files with 6174 additions and 1632 deletions

View file

@ -1,95 +1,3 @@
import com.bettercloud.vault.VaultConfig
import com.bettercloud.vault.Vault
initscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'com.bettercloud:vault-java-driver:4.1.0'
}
}
boolean USE_ARTIFACTORY = false
if (System.getenv('VAULT_ADDR') == null) {
// When trying to reproduce errors outside of CI, it can be useful to allow this to just return rather than blowing up
if (System.getenv('CI') == null) {
return
}
throw new GradleException("You must set the VAULT_ADDR environment variable to use this init script.")
}
if (System.getenv('VAULT_ROLE_ID') == null && System.getenv('VAULT_SECRET_ID') == null && System.getenv('VAULT_TOKEN') == null) {
// When trying to reproduce errors outside of CI, it can be useful to allow this to just return rather than blowing up
if (System.getenv('CI') == null) {
return
}
throw new GradleException("You must set either the VAULT_ROLE_ID and VAULT_SECRET_ID environment variables, " +
"or the VAULT_TOKEN environment variable to use this init script.")
}
final String vaultPathPrefix = System.getenv('VAULT_ADDR') ==~ /.+vault-ci.+\.dev.*/ ? "secret/ci/elastic-elasticsearch/migrated" : "secret/elasticsearch-ci"
final String vaultToken = System.getenv('VAULT_TOKEN') ?: new Vault(
new VaultConfig()
.address(System.env.VAULT_ADDR)
.engineVersion(1)
.build()
)
.withRetries(5, 1000)
.auth()
.loginByAppRole("approle", System.env.VAULT_ROLE_ID, System.env.VAULT_SECRET_ID)
.getAuthClientToken()
final Vault vault = new Vault(
new VaultConfig()
.address(System.env.VAULT_ADDR)
.engineVersion(1)
.token(vaultToken)
.build()
)
.withRetries(5, 1000)
if (USE_ARTIFACTORY) {
final Map<String, String> artifactoryCredentials = vault.logical()
.read("${vaultPathPrefix}/artifactory.elstc.co")
.getData()
logger.info("Using elastic artifactory repos")
Closure configCache = {
return {
name "artifactory-gradle-release"
url "https://artifactory.elstc.co/artifactory/gradle-release"
credentials {
username artifactoryCredentials.get("username")
password artifactoryCredentials.get("token")
}
}
}
settingsEvaluated { settings ->
settings.pluginManagement {
repositories {
maven configCache()
}
}
}
projectsLoaded {
allprojects {
buildscript {
repositories {
maven configCache()
}
}
repositories {
maven configCache()
}
}
}
}
gradle.settingsEvaluated { settings ->
settings.pluginManager.withPlugin("com.gradle.develocity") {
settings.develocity {
@ -98,14 +6,10 @@ gradle.settingsEvaluated { settings ->
}
}
final String buildCacheUrl = System.getProperty('org.elasticsearch.build.cache.url')
final boolean buildCachePush = Boolean.valueOf(System.getProperty('org.elasticsearch.build.cache.push', 'false'))
if (buildCacheUrl) {
final Map<String, String> buildCacheCredentials = System.getenv("GRADLE_BUILD_CACHE_USERNAME") ? [:] : vault.logical()
.read("${vaultPathPrefix}/gradle-build-cache")
.getData()
gradle.settingsEvaluated { settings ->
settings.buildCache {
local {
@ -116,11 +20,10 @@ if (buildCacheUrl) {
url = buildCacheUrl
push = buildCachePush
credentials {
username = System.getenv("GRADLE_BUILD_CACHE_USERNAME") ?: buildCacheCredentials.get("username")
password = System.getenv("GRADLE_BUILD_CACHE_PASSWORD") ?: buildCacheCredentials.get("password")
username = System.getenv("GRADLE_BUILD_CACHE_USERNAME")
password = System.getenv("GRADLE_BUILD_CACHE_PASSWORD")
}
}
}
}
}

View file

@ -24,12 +24,17 @@ allprojects {
}
}
interface Injected {
@Inject FileSystemOperations getFs()
}
// Applying this stuff, particularly the idea-ext plugin, has a cost so avoid it unless we're running in the IDE
if (providers.systemProperty('idea.active').getOrNull() == 'true') {
project.apply(plugin: org.jetbrains.gradle.ext.IdeaExtPlugin)
def elasticsearchProject = locateElasticsearchWorkspace(gradle)
def rootFolder = project.rootDir
tasks.register('configureIdeCheckstyle') {
group = 'ide'
description = 'Generated a suitable checkstyle config for IDEs'
@ -39,10 +44,10 @@ if (providers.systemProperty('idea.active').getOrNull() == 'true') {
String checkstyleConfig = "${resources}/checkstyle.xml"
String checkstyleSuppressions = "${resources}/checkstyle_suppressions.xml"
String checkstyleIdeFragment = "${resources}/checkstyle_ide_fragment.xml"
String checkstyleIdeConfig = "${rootDir}/checkstyle_ide.xml"
String checkstyleIdeConfig = "${rootFolder}/checkstyle_ide.xml"
String checkstylePluginConfigTemplate = "${resources}/checkstyle-idea.xml"
String checkstylePluginConfig = "${rootDir}/.idea/checkstyle-idea.xml"
String checkstylePluginConfig = "${rootFolder}/.idea/checkstyle-idea.xml"
inputs.files(
file(checkstyleConfig),
@ -53,31 +58,33 @@ if (providers.systemProperty('idea.active').getOrNull() == 'true') {
file(checkstyleIdeConfig),
file(checkstylePluginConfig)
)
def injected = project.objects.newInstance(Injected)
def projectFolder = project.layout.projectDirectory.asFile
doLast {
// Configure the IntelliJ Checkstyle plugin by copying a standard file. We don't simply commit
// the result to version control, because the plugin has a habit of modifying the file and
// replacing the `$PROJECT_DIR$` placeholders, which developers must then revert.
project.copy {
injected.fs.copy {
from(checkstylePluginConfigTemplate)
into("${rootDir}/.idea")
into("${rootFolder}/.idea")
expand(jarLocation: buildConventionsJar, configLocation: checkstyleIdeConfig)
}
// Create an IDE-specific checkstyle config by first copying the standard config
Files.copy(
Paths.get(file(checkstyleConfig).getPath()),
Paths.get(file(checkstyleIdeConfig).getPath()),
Paths.get(new File(checkstyleConfig).getPath()),
Paths.get(new File(checkstyleIdeConfig).getPath()),
StandardCopyOption.REPLACE_EXISTING
)
// There are some rules that we only want to enable in an IDE. These
// are extracted to a separate file, and merged into the IDE-specific
// Checkstyle config.
Node xmlFragment = parseXml(checkstyleIdeFragment)
Node xmlFragment = IdeaXmlUtil.parseXml(checkstyleIdeFragment)
// Edit the copy so that IntelliJ can copy with it
modifyXml(checkstyleIdeConfig, { xml ->
IdeaXmlUtil.modifyXml(checkstyleIdeConfig, { xml ->
// Add all the nodes from the fragment file
Node treeWalker = xml.module.find { it.'@name' == 'TreeWalker' }
xmlFragment.module.each { treeWalker.append(it) }
@ -103,7 +110,7 @@ if (providers.systemProperty('idea.active').getOrNull() == 'true') {
description = 'Configures the appropriate JVM for Gradle'
doLast {
modifyXml('.idea/gradle.xml') { xml ->
IdeaXmlUtil.modifyXml('.idea/gradle.xml') { xml ->
def gradleSettings = xml.component.find { it.'@name' == 'GradleSettings' }.option[0].GradleProjectSettings
// Remove configured JVM option to force IntelliJ to use the project JDK for Gradle
gradleSettings.option.findAll { it.'@name' == 'gradleJvm' }.each { it.parent().remove(it) }
@ -127,7 +134,7 @@ if (providers.systemProperty('idea.active').getOrNull() == 'true') {
description = 'Enable per-module *.iml files'
doLast {
modifyXml('.idea/misc.xml') {xml ->
IdeaXmlUtil.modifyXml('.idea/misc.xml') {xml ->
def externalStorageConfig = xml.component.find { it.'@name' == 'ExternalStorageConfigurationManager' }
if (externalStorageConfig) {
xml.remove(externalStorageConfig)
@ -142,13 +149,13 @@ if (providers.systemProperty('idea.active').getOrNull() == 'true') {
description = 'Enables preview features on native library module'
dependsOn tasks.named("enableExternalConfiguration")
ext {
enablePreview = { moduleFile, languageLevel ->
modifyXml(moduleFile) { xml ->
// ext {
def enablePreview = { moduleFile, languageLevel ->
IdeaXmlUtil.modifyXml(moduleFile) { xml ->
xml.component.find { it.'@name' == 'NewModuleRootManager' }?.'@LANGUAGE_LEVEL' = languageLevel
}
}
}
// }
doLast {
enablePreview('.idea/modules/libs/native/elasticsearch.libs.native.main.iml', 'JDK_21_PREVIEW')
@ -278,12 +285,22 @@ if (providers.systemProperty('idea.active').getOrNull() == 'true') {
* @param preface optional front matter to add after the XML declaration
* but before the XML document, e.g. a doctype or comment
*/
void modifyXml(Object path, Action<? super Node> action, String preface = null) {
if (project.file(path).exists()) {
Node xml = parseXml(path)
class IdeaXmlUtil {
static Node parseXml(Object xmlPath) {
File xmlFile = new File(xmlPath)
XmlParser xmlParser = new XmlParser(false, true, true)
xmlParser.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false)
Node xml = xmlParser.parse(xmlFile)
return xml
}
static void modifyXml(Object xmlPath, Action<? super Node> action, String preface = null) {
File xmlFile = new File(xmlPath)
if (xmlFile.exists()) {
Node xml = parseXml(xmlPath)
action.execute(xml)
File xmlFile = project.file(path)
xmlFile.withPrintWriter { writer ->
def printer = new XmlNodePrinter(writer)
printer.namespaceAware = true
@ -297,15 +314,9 @@ void modifyXml(Object path, Action<? super Node> action, String preface = null)
}
}
}
Node parseXml(Object path) {
File xmlFile = project.file(path)
XmlParser xmlParser = new XmlParser(false, true, true)
xmlParser.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false)
Node xml = xmlParser.parse(xmlFile)
return xml
}
Pair<File, IncludedBuild> locateElasticsearchWorkspace(Gradle gradle) {
if (gradle.parent == null) {
// See if any of these included builds is the Elasticsearch gradle

View file

@ -1,21 +1,17 @@
---
apiVersion: v1
# The repository name in registry1, excluding /ironbank/
name: "elastic/elasticsearch/elasticsearch"
# List of tags to push for the repository in registry1
# The most specific version should be the first tag and will be shown
# on ironbank.dsop.io
tags:
- "${version}"
- "latest"
# Build args passed to Dockerfile ARGs
args:
BASE_IMAGE: "redhat/ubi/ubi9"
BASE_TAG: "9.4"
BASE_TAG: "9.5"
# Docker image labels
labels:
org.opencontainers.image.title: "elasticsearch"
@ -34,7 +30,6 @@ labels:
mil.dso.ironbank.image.type: "commercial"
# Product the image belongs to for grouping multiple images
mil.dso.ironbank.product.name: "elasticsearch"
# List of resources to make available to the offline build context
resources:
- filename: "elasticsearch-${version}-linux-x86_64.tar.gz"
@ -47,7 +42,6 @@ resources:
validation:
type: "sha256"
value: "93dcc18adc78c65a028a84799ecf8ad40c936fdfc5f2a57b1acda5a8117fa82c"
# List of project maintainers
maintainers:
- name: "Mark Vieira"

View file

@ -0,0 +1,5 @@
pr: 116687
summary: Add LogsDB option to route on sort fields
area: Logs
type: enhancement
issues: []

View file

@ -0,0 +1,5 @@
pr: 118353
summary: Epoch Millis Rounding Down and Not Up 2
area: Infra/Core
type: bug
issues: []

View file

@ -0,0 +1,6 @@
pr: 118562
summary: Update data stream deprecations warnings to new format and filter searchable
snapshots from response
area: Data streams
type: enhancement
issues: []

View file

@ -0,0 +1,6 @@
pr: 118603
summary: Allow DATE_PARSE to read the timezones
area: ES|QL
type: bug
issues:
- 117680

View file

@ -0,0 +1,5 @@
pr: 118802
summary: ST_EXTENT_AGG optimize envelope extraction from doc-values for cartesian_shape
area: "ES|QL"
type: enhancement
issues: []

View file

@ -0,0 +1,6 @@
pr: 118931
summary: Add a `LicenseAware` interface for licensed Nodes
area: ES|QL
type: enhancement
issues:
- 117405

View file

@ -0,0 +1,5 @@
pr: 118941
summary: Allow archive and searchable snapshots indices in N-2 version
area: Recovery
type: enhancement
issues: []

View file

@ -0,0 +1,5 @@
pr: 119131
summary: Expose BwC enrich cache setting in plugin
area: Ingest Node
type: bug
issues: []

View file

@ -112,7 +112,7 @@ it is necessary to use the search function, like <<esql-match>>, in a <<esql-whe
directly after the <<esql-from>> source command, or close enough to it.
Otherwise, the query will fail with a validation error.
Another limitation is that any <<esql-where>> command containing a full-text search function
cannot also use disjunctions (`OR`).
cannot also use disjunctions (`OR`) unless all functions used in the OR clauses are full-text functions themselves.
For example, this query is valid:
@ -139,6 +139,15 @@ FROM books
| WHERE MATCH(author, "Faulkner") OR author LIKE "Hemingway"
----
However this query will succeed because it uses full text functions on both `OR` clauses:
[source,esql]
----
FROM books
| WHERE MATCH(author, "Faulkner") OR QSTR("author: Hemingway")
----
Note that, because of <<esql-limitations-text-fields,the way {esql} treats `text` values>>,
any queries on `text` fields that do not explicitly use the full-text functions,
<<esql-match>> or <<esql-qstr>>, will behave as if the fields are actually `keyword` fields:

View file

@ -2,4 +2,4 @@
*Description*
Performs a <<query-dsl-match-query,match query>> on the specified field. Returns true if the provided query matches the row.
Use `MATCH` to perform a <<query-dsl-match-query,match query>> on the specified field. Using `MATCH` is equivalent to using the `match` query in the Elasticsearch Query DSL. Match can be used on text fields, as well as other field types like boolean, dates, and numeric types. For a simplified syntax, you can use the <<esql-search-operators,match operator>> `:` operator instead of `MATCH`. `MATCH` returns true if the provided query matches the row.

View file

@ -2,7 +2,7 @@
"comment" : "This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.",
"type" : "eval",
"name" : "match",
"description" : "Performs a <<query-dsl-match-query,match query>> on the specified field. Returns true if the provided query matches the row.",
"description" : "Use `MATCH` to perform a <<query-dsl-match-query,match query>> on the specified field.\nUsing `MATCH` is equivalent to using the `match` query in the Elasticsearch Query DSL.\n\nMatch can be used on text fields, as well as other field types like boolean, dates, and numeric types.\n\nFor a simplified syntax, you can use the <<esql-search-operators,match operator>> `:` operator instead of `MATCH`.\n\n`MATCH` returns true if the provided query matches the row.",
"signatures" : [
{
"params" : [

View file

@ -3,7 +3,14 @@ This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../READ
-->
### MATCH
Performs a <<query-dsl-match-query,match query>> on the specified field. Returns true if the provided query matches the row.
Use `MATCH` to perform a <<query-dsl-match-query,match query>> on the specified field.
Using `MATCH` is equivalent to using the `match` query in the Elasticsearch Query DSL.
Match can be used on text fields, as well as other field types like boolean, dates, and numeric types.
For a simplified syntax, you can use the <<esql-search-operators,match operator>> `:` operator instead of `MATCH`.
`MATCH` returns true if the provided query matches the row.
```
FROM books

View file

@ -6,11 +6,13 @@
++++
Full text functions are used to search for text in fields.
<<analysis, Text analysiss>> is used to analyze the query before it is searched.
<<analysis, Text analysis>> is used to analyze the query before it is searched.
Full text functions can be used to match <<esql-multivalued-fields,multivalued fields>>.
A multivalued field that contains a value that matches a full text query is considered to match the query.
Full text functions are significantly more performant for text search use cases on large data sets than using pattern matching or regular expressions with `LIKE` or `RLIKE`
See <<esql-limitations-full-text-search,full text search limitations>> for information on the limitations of full text search.
{esql} supports these full-text search functions:

View file

@ -58,6 +58,26 @@ For a complete list of all functions, refer to <<esql-functions>>.
include::../functions/predicates.asciidoc[tag=body]
For matching text, you can use <<esql-search-functions,full text search functions>> like `MATCH`.
Use <<esql-match,`MATCH`>> to perform a <<query-dsl-match-query,match query>> on a specified field.
Match can be used on text fields, as well as other field types like boolean, dates, and numeric types.
[source.merge.styled,esql]
----
include::{esql-specs}/match-function.csv-spec[tag=match-with-field]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/match-function.csv-spec[tag=match-with-field-result]
|===
[TIP]
====
You can also use the shorthand <<esql-search-operators,match operator>> `:` instead of `MATCH`.
====
include::../functions/like.asciidoc[tag=body]
include::../functions/rlike.asciidoc[tag=body]

View file

@ -172,9 +172,8 @@ The API returns the following response:
"attributes": {},
"roles": [...],
"version": "8.10.0",
"min_index_version": 8000099,
"min_read_only_index_version": 7000099,
"max_index_version": 9004000
"min_index_version": 7000099,
"max_index_version": 8100099
},
"allocation_id": "2iNySv_OQVePRX-yaRH_lQ", <4>
"allocation" : "primary|replica|unused" <5>
@ -194,7 +193,6 @@ The API returns the following response:
// TESTRESPONSE[s/"roles": \[[^]]*\]/"roles": $body.$_path/]
// TESTRESPONSE[s/"8.10.0"/\$node_version/]
// TESTRESPONSE[s/"min_index_version": 7000099/"min_index_version": $body.$_path/]
// TESTRESPONSE[s/"min_index_version": 7000099/"min_index_version": $body.$_path/]
// TESTRESPONSE[s/"max_index_version": 8100099/"max_index_version": $body.$_path/]

View file

@ -1,103 +1,149 @@
[[task-queue-backlog]]
=== Task queue backlog
=== Backlogged task queue
A backlogged task queue can prevent tasks from completing and put the cluster
into an unhealthy state. Resource constraints, a large number of tasks being
triggered at once, and long running tasks can all contribute to a backlogged
task queue.
*******************************
*Product:* Elasticsearch +
*Deployment type:* Elastic Cloud Enterprise, Elastic Cloud Hosted, Elastic Cloud on Kubernetes, Elastic Self-Managed +
*Versions:* All
*******************************
A backlogged task queue can prevent tasks from completing and lead to an
unhealthy cluster state. Contributing factors include resource constraints,
a large number of tasks triggered at once, and long-running tasks.
[discrete]
[[diagnose-task-queue-backlog]]
==== Diagnose a task queue backlog
==== Diagnose a backlogged task queue
**Check the thread pool status**
To identify the cause of the backlog, try these diagnostic actions.
* <<diagnose-task-queue-thread-pool>>
* <<diagnose-task-queue-hot-thread>>
* <<diagnose-task-queue-long-running-node-tasks>>
* <<diagnose-task-queue-long-running-cluster-tasks>>
[discrete]
[[diagnose-task-queue-thread-pool]]
===== Check the thread pool status
A <<high-cpu-usage,depleted thread pool>> can result in
<<rejected-requests,rejected requests>>.
Thread pool depletion might be restricted to a specific <<data-tiers,data tier>>. If <<hotspotting,hot spotting>> is occuring, one node might experience depletion faster than other nodes, leading to performance issues and a growing task backlog.
You can use the <<cat-thread-pool,cat thread pool API>> to see the number of
active threads in each thread pool and how many tasks are queued, how many
have been rejected, and how many have completed.
Use the <<cat-thread-pool,cat thread pool API>> to monitor
active threads, queued tasks, rejections, and completed tasks:
[source,console]
----
GET /_cat/thread_pool?v&s=t,n&h=type,name,node_name,active,queue,rejected,completed
----
The `active` and `queue` statistics are instantaneous while the `rejected` and
`completed` statistics are cumulative from node startup.
* Look for high `active` and `queue` metrics, which indicate potential bottlenecks
and opportunities to <<reduce-cpu-usage,reduce CPU usage>>.
* Determine whether thread pool issues are specific to a <<data-tiers,data tier>>.
* Check whether a specific node's thread pool is depleting faster than others. This
might indicate <<resolve-task-queue-backlog-hotspotting, hot spotting>>.
**Inspect the hot threads on each node**
[discrete]
[[diagnose-task-queue-hot-thread]]
===== Inspect hot threads on each node
If a particular thread pool queue is backed up, you can periodically poll the
<<cluster-nodes-hot-threads,Nodes hot threads>> API to determine if the thread
has sufficient resources to progress and gauge how quickly it is progressing.
If a particular thread pool queue is backed up, periodically poll the
<<cluster-nodes-hot-threads,nodes hot threads API>> to gauge the thread's
progression and ensure it has sufficient resources:
[source,console]
----
GET /_nodes/hot_threads
----
**Look for long running node tasks**
Although the hot threads API response does not list the specific tasks running on a thread,
it provides a summary of the thread's activities. You can correlate a hot threads response
with a <<tasks,task management API response>> to identify any overlap with specific tasks. For
example, if the hot threads response indicates the thread is `performing a search query`, you can
<<diagnose-task-queue-long-running-node-tasks,check for long-running search tasks>> using the task management API.
Long-running tasks can also cause a backlog. You can use the <<tasks,task
management>> API to get information about the node tasks that are running.
Check the `running_time_in_nanos` to identify tasks that are taking an
excessive amount of time to complete.
[discrete]
[[diagnose-task-queue-long-running-node-tasks]]
===== Identify long-running node tasks
Long-running tasks can also cause a backlog. Use the <<tasks,task
management API>> to check for excessive `running_time_in_nanos` values:
[source,console]
----
GET /_tasks?pretty=true&human=true&detailed=true
----
If a particular `action` is suspected, you can filter the tasks further. The most common long-running tasks are <<docs-bulk,bulk index>>- or search-related.
You can filter on a specific `action`, such as <<docs-bulk,bulk indexing>> or search-related tasks.
These tend to be long-running.
* Filter for <<docs-bulk,bulk index>> actions:
* Filter on <<docs-bulk,bulk index>> actions:
+
[source,console]
----
GET /_tasks?human&detailed&actions=indices:data/write/bulk
----
* Filter for search actions:
* Filter on search actions:
+
[source,console]
----
GET /_tasks?human&detailed&actions=indices:data/write/search
----
The API response may contain additional tasks columns, including `description` and `header`, which provides the task parameters, target, and requestor. You can use this information to perform further diagnosis.
Long-running tasks might need to be <<resolve-task-queue-backlog-stuck-tasks,canceled>>.
**Look for long running cluster tasks**
[discrete]
[[diagnose-task-queue-long-running-cluster-tasks]]
===== Look for long-running cluster tasks
A task backlog might also appear as a delay in synchronizing the cluster state. You
can use the <<cluster-pending,cluster pending tasks API>> to get information
about the pending cluster state sync tasks that are running.
Use the <<cluster-pending,cluster pending tasks API>> to identify delays
in cluster state synchronization:
[source,console]
----
GET /_cluster/pending_tasks
----
Check the `timeInQueue` to identify tasks that are taking an excessive amount
of time to complete.
Tasks with a high `timeInQueue` value are likely contributing to the backlog and might
need to be <<resolve-task-queue-backlog-stuck-tasks,canceled>>.
[discrete]
[[resolve-task-queue-backlog]]
==== Resolve a task queue backlog
==== Recommendations
**Increase available resources**
After identifying problematic threads and tasks, resolve the issue by increasing resources or canceling tasks.
If tasks are progressing slowly and the queue is backing up,
you might need to take steps to <<reduce-cpu-usage>>.
[discrete]
[[resolve-task-queue-backlog-resources]]
===== Increase available resources
In some cases, increasing the thread pool size might help.
For example, the `force_merge` thread pool defaults to a single thread.
If tasks are progressing slowly, try <<reduce-cpu-usage,reducing CPU usage>>.
In some cases, you might need to increase the thread pool size. For example, the `force_merge` thread pool defaults to a single thread.
Increasing the size to 2 might help reduce a backlog of force merge requests.
**Cancel stuck tasks**
[discrete]
[[resolve-task-queue-backlog-stuck-tasks]]
===== Cancel stuck tasks
If you find the active task's hot thread isn't progressing and there's a backlog,
consider canceling the task.
If an active task's <<diagnose-task-queue-hot-thread,hot thread>> shows no progress, consider <<task-cancellation,canceling the task>>.
[discrete]
[[resolve-task-queue-backlog-hotspotting]]
===== Address hot spotting
If a specific node's thread pool is depleting faster than others, try addressing
uneven node resource utilization, also known as hot spotting.
For details on actions you can take, such as rebalancing shards, see <<hotspotting>>.
[discrete]
==== Resources
Related symptoms:
* <<high-cpu-usage>>
* <<rejected-requests>>
* <<hotspotting>>
// TODO add link to standard Additional resources when that topic exists

View file

@ -55,7 +55,7 @@ public class RestEntitlementsCheckAction extends BaseRestHandler {
entry("runtime_exit", deniedToPlugins(RestEntitlementsCheckAction::runtimeExit)),
entry("runtime_halt", deniedToPlugins(RestEntitlementsCheckAction::runtimeHalt)),
entry("create_classloader", forPlugins(RestEntitlementsCheckAction::createClassLoader)),
// entry("processBuilder_start", deniedToPlugins(RestEntitlementsCheckAction::processBuilder_start)),
entry("processBuilder_start", deniedToPlugins(RestEntitlementsCheckAction::processBuilder_start)),
entry("processBuilder_startPipeline", deniedToPlugins(RestEntitlementsCheckAction::processBuilder_startPipeline))
);
@ -78,7 +78,11 @@ public class RestEntitlementsCheckAction extends BaseRestHandler {
}
private static void processBuilder_start() {
// TODO: processBuilder().start();
try {
new ProcessBuilder("").start();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
private static void processBuilder_startPipeline() {

View file

@ -34,6 +34,7 @@ import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@ -72,18 +73,18 @@ public class EntitlementInitialization {
Instrumenter instrumenter = INSTRUMENTER_FACTORY.newInstrumenter(EntitlementChecker.class, checkMethods);
inst.addTransformer(new Transformer(instrumenter, classesToTransform), true);
// TODO: should we limit this array somehow?
var classesToRetransform = classesToTransform.stream().map(EntitlementInitialization::internalNameToClass).toArray(Class[]::new);
inst.retransformClasses(classesToRetransform);
inst.retransformClasses(findClassesToRetransform(inst.getAllLoadedClasses(), classesToTransform));
}
private static Class<?> internalNameToClass(String internalName) {
try {
return Class.forName(internalName.replace('/', '.'), false, ClassLoader.getPlatformClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
private static Class<?>[] findClassesToRetransform(Class<?>[] loadedClasses, Set<String> classesToTransform) {
List<Class<?>> retransform = new ArrayList<>();
for (Class<?> loadedClass : loadedClasses) {
if (classesToTransform.contains(loadedClass.getName().replace(".", "/"))) {
retransform.add(loadedClass);
}
}
return retransform.toArray(new Class<?>[0]);
}
private static PolicyManager createPolicyManager() throws IOException {
Map<String, Policy> pluginPolicies = createPluginPolicies(EntitlementBootstrap.bootstrapArgs().pluginData());

View file

@ -99,17 +99,12 @@ public class TimeSeriesTsidHashCardinalityIT extends ESSingleNodeTestCase {
.setSettings(
settings.put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), IndexVersions.NEW_INDEXVERSION_FORMAT).build()
)
.setMapping(mapping)
.get()
);
assertAcked(
.setMapping(mapping),
indicesAdmin().prepareCreate(afterIndex)
.setSettings(
settings.put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), IndexVersions.TIME_SERIES_ID_HASHING).build()
)
.setMapping(mapping)
.get()
);
final TimeSeriesDataset timeSeriesDataset = new TimeSeriesDataset();

View file

@ -37,3 +37,16 @@ artifacts {
tasks.named("yamlRestCompatTestTransform").configure { task ->
task.replaceValueInMatch("tokens.0.token", "absenț", "romanian")
}
tasks.named("yamlRestTest").configure {
if (buildParams.getRuntimeJavaVersion().map{ it.majorVersion.toInteger() }.get() >= 24 ||
"-Des.entitlements.enabled=true".equals(System.getProperty("tests.jvm.argline"))) {
systemProperty 'tests.rest.blacklist',
[
// AWAITSFIX: this test relies on security manager, which doesn't exist in JDK 24.
// and entitlements don't yet replace the functionality.
// see https://github.com/elastic/elasticsearch/issues/119130
'analysis-common/40_token_filters/stemmer_override file access',
].join(',')
}
}

View file

@ -89,8 +89,6 @@ public class StemmerTokenFilterFactory extends AbstractTokenFilterFactory {
private final String language;
private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(StemmerTokenFilterFactory.class);
StemmerTokenFilterFactory(IndexSettings indexSettings, Environment environment, String name, Settings settings) throws IOException {
super(name);
this.language = Strings.capitalize(settings.get("language", settings.get("name", "porter")));
@ -192,7 +190,7 @@ public class StemmerTokenFilterFactory extends AbstractTokenFilterFactory {
} else if ("german".equalsIgnoreCase(language)) {
return new SnowballFilter(tokenStream, new GermanStemmer());
} else if ("german2".equalsIgnoreCase(language)) {
DEPRECATION_LOGGER.critical(
deprecationLogger.critical(
DeprecationCategory.ANALYSIS,
"german2_stemmer_deprecation",
"The 'german2' stemmer has been deprecated and folded into the 'german' Stemmer. "

View file

@ -13,7 +13,6 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.synonym.SynonymFilter;
import org.apache.lucene.analysis.synonym.SynonymMap;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService.IndexCreationContext;
@ -130,8 +129,6 @@ public class SynonymTokenFilterFactory extends AbstractTokenFilterFactory {
}
}
private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(SynonymTokenFilterFactory.class);
private final String format;
private final boolean expand;
private final boolean lenient;

View file

@ -12,6 +12,9 @@ package org.elasticsearch.datastreams;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import java.io.IOException;
@ -122,13 +125,25 @@ public class DataStreamOptionsIT extends DisabledSecurityDataStreamTestCase {
assertOK(client().performRequest(otherRequest));
}
public void testEnableDisableFailureStore() throws IOException {
public void testBehaviorWithEachFailureStoreOptionAndClusterSetting() throws IOException {
{
// Default data stream options
assertAcknowledged(client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME + "/_options")));
assertFailureStore(false, 1);
setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME);
assertDataStreamOptions(null);
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
assertRedirectsDocWithBadMappingToFailureStore();
setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream");
assertDataStreamOptions(null);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
setDataStreamFailureStoreClusterSetting(null); // should get same behaviour as when we set it to something non-matching
assertDataStreamOptions(null);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
}
{
// Data stream options with failure store enabled
Request enableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
enableRequest.setJsonEntity("""
{
@ -137,11 +152,21 @@ public class DataStreamOptionsIT extends DisabledSecurityDataStreamTestCase {
}
}""");
assertAcknowledged(client().performRequest(enableRequest));
assertFailureStore(true, 1);
setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME);
assertDataStreamOptions(true);
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
assertRedirectsDocWithBadMappingToFailureStore();
setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream"); // should have no effect as enabled in options
assertDataStreamOptions(true);
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
assertRedirectsDocWithBadMappingToFailureStore();
setDataStreamFailureStoreClusterSetting(null); // same as previous
assertDataStreamOptions(true);
assertFailureStoreValuesInGetDataStreamResponse(true, 1);
assertRedirectsDocWithBadMappingToFailureStore();
}
{
// Data stream options with failure store disabled
Request disableRequest = new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME + "/_options");
disableRequest.setJsonEntity("""
{
@ -150,13 +175,23 @@ public class DataStreamOptionsIT extends DisabledSecurityDataStreamTestCase {
}
}""");
assertAcknowledged(client().performRequest(disableRequest));
assertFailureStore(false, 1);
setDataStreamFailureStoreClusterSetting(DATA_STREAM_NAME); // should have no effect as disabled in options
assertDataStreamOptions(false);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
setDataStreamFailureStoreClusterSetting("does-not-match-failure-data-stream");
assertDataStreamOptions(false);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
setDataStreamFailureStoreClusterSetting(null);
assertDataStreamOptions(false);
assertFailureStoreValuesInGetDataStreamResponse(false, 1);
assertFailsDocWithBadMapping();
}
}
@SuppressWarnings("unchecked")
private void assertFailureStore(boolean failureStoreEnabled, int failureStoreSize) throws IOException {
private void assertFailureStoreValuesInGetDataStreamResponse(boolean failureStoreEnabled, int failureStoreSize) throws IOException {
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
assertThat(dataStreams.size(), is(1));
@ -198,4 +233,32 @@ public class DataStreamOptionsIT extends DisabledSecurityDataStreamTestCase {
List<Map<String, String>> indices = (List<Map<String, String>>) response.get("indices");
return indices.stream().map(index -> index.get("index_name")).toList();
}
private static void setDataStreamFailureStoreClusterSetting(String value) throws IOException {
updateClusterSettings(
Settings.builder().put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), value).build()
);
}
private Response putDocumentWithBadMapping() throws IOException {
Request request = new Request("POST", DATA_STREAM_NAME + "/_doc");
request.setJsonEntity("""
{
"@timestamp": "not a timestamp",
"foo": "bar"
}
""");
return client().performRequest(request);
}
private void assertRedirectsDocWithBadMappingToFailureStore() throws IOException {
Response response = putDocumentWithBadMapping();
String failureStoreResponse = (String) entityAsMap(response).get("failure_store");
assertThat(failureStoreResponse, is("used"));
}
private void assertFailsDocWithBadMapping() {
ResponseException e = assertThrows(ResponseException.class, this::putDocumentWithBadMapping);
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(RestStatus.BAD_REQUEST.getStatus()));
}
}

View file

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.IndexMetadata;
@ -66,6 +67,7 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
private final SystemIndices systemIndices;
private final ClusterSettings clusterSettings;
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
private final Client client;
@Inject
@ -78,6 +80,7 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices,
DataStreamGlobalRetentionSettings globalRetentionSettings,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
Client client
) {
super(
@ -95,6 +98,7 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
this.systemIndices = systemIndices;
this.globalRetentionSettings = globalRetentionSettings;
clusterSettings = clusterService.getClusterSettings();
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
this.client = new OriginSettingClient(client, "stack");
}
@ -126,6 +130,7 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
systemIndices,
clusterSettings,
globalRetentionSettings,
dataStreamFailureStoreSettings,
maxTimestamps
)
);
@ -138,7 +143,16 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
});
} else {
listener.onResponse(
innerOperation(state, request, indexNameExpressionResolver, systemIndices, clusterSettings, globalRetentionSettings, null)
innerOperation(
state,
request,
indexNameExpressionResolver,
systemIndices,
clusterSettings,
globalRetentionSettings,
dataStreamFailureStoreSettings,
null
)
);
}
}
@ -150,11 +164,16 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
SystemIndices systemIndices,
ClusterSettings clusterSettings,
DataStreamGlobalRetentionSettings globalRetentionSettings,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
@Nullable Map<String, Long> maxTimestamps
) {
List<DataStream> dataStreams = getDataStreams(state.metadata(), indexNameExpressionResolver, request);
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
for (DataStream dataStream : dataStreams) {
// For this action, we are returning whether the failure store is effectively enabled, either in metadata or by cluster setting.
// Users can use the get data stream options API to find out whether it is explicitly enabled in metadata.
boolean failureStoreEffectivelyEnabled = DataStream.isFailureStoreFeatureFlagEnabled()
&& dataStream.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings);
final String indexTemplate;
boolean indexTemplatePreferIlmValue = true;
String ilmPolicyName = null;
@ -259,6 +278,7 @@ public class TransportGetDataStreamsAction extends TransportMasterNodeReadProjec
dataStreamInfos.add(
new GetDataStreamAction.Response.DataStreamInfo(
dataStream,
failureStoreEffectivelyEnabled,
streamHealth.getStatus(),
indexTemplate,
ilmPolicyName,

View file

@ -102,6 +102,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
Response.DataStreamInfo dataStreamInfo = new Response.DataStreamInfo(
logs,
true,
ClusterHealthStatus.GREEN,
"index-template",
null,
@ -205,6 +206,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
Response.DataStreamInfo dataStreamInfo = new Response.DataStreamInfo(
logs,
true,
ClusterHealthStatus.GREEN,
"index-template",
null,
@ -282,6 +284,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance) {
var dataStream = instance.getDataStream();
var failureStoreEffectivelyEnabled = instance.isFailureStoreEffectivelyEnabled();
var status = instance.getDataStreamStatus();
var indexTemplate = instance.getIndexTemplate();
var ilmPolicyName = instance.getIlmPolicy();
@ -289,7 +292,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
var indexSettings = instance.getIndexSettingsValues();
var templatePreferIlm = instance.templatePreferIlmValue();
var maximumTimestamp = instance.getMaximumTimestamp();
switch (randomIntBetween(0, 7)) {
switch (randomIntBetween(0, 8)) {
case 0 -> dataStream = randomValueOtherThan(dataStream, DataStreamTestHelper::randomInstance);
case 1 -> status = randomValueOtherThan(status, () -> randomFrom(ClusterHealthStatus.values()));
case 2 -> indexTemplate = randomBoolean() && indexTemplate != null ? null : randomAlphaOfLengthBetween(2, 10);
@ -314,9 +317,11 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
case 7 -> maximumTimestamp = (maximumTimestamp == null)
? randomNonNegativeLong()
: (usually() ? randomValueOtherThan(maximumTimestamp, ESTestCase::randomNonNegativeLong) : null);
case 8 -> failureStoreEffectivelyEnabled = failureStoreEffectivelyEnabled ? false : true;
}
return new Response.DataStreamInfo(
dataStream,
failureStoreEffectivelyEnabled,
status,
indexTemplate,
ilmPolicyName,
@ -355,6 +360,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
List<Tuple<Instant, Instant>> timeSeries = randomBoolean() ? generateRandomTimeSeries() : null;
return new Response.DataStreamInfo(
DataStreamTestHelper.randomInstance(),
randomBoolean(),
ClusterHealthStatus.GREEN,
randomAlphaOfLengthBetween(2, 10),
randomAlphaOfLengthBetween(2, 10),

View file

@ -12,6 +12,7 @@ import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
@ -39,6 +40,8 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class TransportGetDataStreamsActionTests extends ESTestCase {
@ -48,6 +51,9 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
private final DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = DataStreamGlobalRetentionSettings.create(
ClusterSettings.createBuiltInClusterSettings()
);
private final DataStreamFailureStoreSettings emptyDataStreamFailureStoreSettings = DataStreamFailureStoreSettings.create(
ClusterSettings.createBuiltInClusterSettings()
);
public void testGetDataStream() {
final String dataStreamName = "my-data-stream";
@ -173,6 +179,7 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
emptyDataStreamFailureStoreSettings,
null
);
assertThat(
@ -204,6 +211,7 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
emptyDataStreamFailureStoreSettings,
null
);
assertThat(
@ -256,6 +264,7 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
emptyDataStreamFailureStoreSettings,
null
);
assertThat(
@ -296,6 +305,7 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
emptyDataStreamFailureStoreSettings,
null
);
@ -329,6 +339,7 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
emptyDataStreamFailureStoreSettings,
null
);
assertThat(response.getGlobalRetention(), nullValue());
@ -354,8 +365,102 @@ public class TransportGetDataStreamsActionTests extends ESTestCase {
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
withGlobalRetentionSettings,
emptyDataStreamFailureStoreSettings,
null
);
assertThat(response.getGlobalRetention(), equalTo(globalRetention));
}
public void testDataStreamIsFailureStoreEffectivelyEnabled_disabled() {
var metadata = new Metadata.Builder();
DataStreamTestHelper.getClusterStateWithDataStreams(
metadata,
List.of(Tuple.tuple("data-stream-1", 2)),
List.of(),
System.currentTimeMillis(),
Settings.EMPTY,
0,
false,
false
);
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build();
var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {});
var response = TransportGetDataStreamsAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
emptyDataStreamFailureStoreSettings,
null
);
assertThat(response.getDataStreams(), hasSize(1));
assertThat(response.getDataStreams().getFirst().isFailureStoreEffectivelyEnabled(), is(false));
}
public void testDataStreamIsFailureStoreEffectivelyEnabled_enabledExplicitly() {
var metadata = new Metadata.Builder();
DataStreamTestHelper.getClusterStateWithDataStreams(
metadata,
List.of(Tuple.tuple("data-stream-1", 2)),
List.of(),
System.currentTimeMillis(),
Settings.EMPTY,
0,
false,
true
);
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build();
var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {});
var response = TransportGetDataStreamsAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
emptyDataStreamFailureStoreSettings,
null
);
assertThat(response.getDataStreams(), hasSize(1));
assertThat(response.getDataStreams().getFirst().isFailureStoreEffectivelyEnabled(), is(true));
}
public void testDataStreamIsFailureStoreEffectivelyEnabled_enabledByClusterSetting() {
var metadata = new Metadata.Builder();
DataStreamTestHelper.getClusterStateWithDataStreams(
metadata,
List.of(Tuple.tuple("data-stream-1", 2)),
List.of(),
System.currentTimeMillis(),
Settings.EMPTY,
0,
false,
false
);
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build();
var req = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] {});
var response = TransportGetDataStreamsAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings(),
dataStreamGlobalRetentionSettings,
DataStreamFailureStoreSettings.create(
ClusterSettings.createBuiltInClusterSettings(
Settings.builder()
.put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "data-stream-*")
.build()
)
),
null
);
assertThat(response.getDataStreams(), hasSize(1));
assertThat(response.getDataStreams().getFirst().isFailureStoreEffectivelyEnabled(), is(true));
}
}

View file

@ -45,8 +45,6 @@
- do:
indices.close:
index: logs-*
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- length: { indices: 0 }

View file

@ -0,0 +1,222 @@
setup:
- requires:
reason: "Data stream options was added in 8.18+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- method: PUT
path: /_cluster/settings
capabilities: [ 'data_stream_failure_store_cluster_setting' ]
- do:
cluster.put_settings:
body:
persistent:
data_streams.failure_store.enabled: '*-matching'
- do:
ingest.put_pipeline:
id: "failing_pipeline"
body: >
{
"processors": [
{
"fail": {
"message" : "pipeline go boom"
}
}
]
}
- do:
indices.put_index_template:
name: index_template_default_fs
body:
index_patterns: default-fs-*
data_stream: {}
template:
settings:
number_of_shards: 1
number_of_replicas: 1
- do:
cluster.put_component_template:
name: component_template_disabled_fs
body:
template:
data_stream_options:
failure_store:
enabled: false
- do:
indices.put_index_template:
name: index_template_disabled_fs
body:
index_patterns: disabled-fs-*
data_stream: {}
composed_of:
- component_template_disabled_fs
template:
settings:
number_of_shards: 1
number_of_replicas: 1
---
teardown:
- do:
indices.delete_data_stream:
name: default-fs-matching
ignore: 404
- do:
indices.delete_data_stream:
name: disabled-fs-matching
ignore: 404
- do:
indices.delete_index_template:
name: index_template_disabled_fs
ignore: 404
- do:
cluster.delete_component_template:
name: component_template_disabled_fs
ignore: 404
- do:
indices.delete_index_template:
name: index_template_default_fs
ignore: 404
- do:
ingest.delete_pipeline:
id: "failing_pipeline"
ignore: 404
- do:
cluster.put_settings:
body:
persistent:
data_streams.failure_store.enabled: null
---
"Redirect ingest failure when auto-creating data stream to failure store when enabled by setting":
- do:
index:
index: default-fs-matching
refresh: true
pipeline: 'failing_pipeline'
body:
'@timestamp': '2020-12-12'
foo: bar
- match: { failure_store: used }
- match: { _index: '/\.fs-default-fs-matching-(\d{4}\.\d{2}\.\d{2}-)?\d{6}/' }
---
"Redirect ingest failure into pre-existing data stream to failure store when enabled by setting":
- do:
indices.create_data_stream:
name: default-fs-matching
- do:
index:
index: default-fs-matching
refresh: true
pipeline: 'failing_pipeline'
body:
'@timestamp': '2020-12-12'
foo: bar
- match: { failure_store: used }
- match: { _index: '/\.fs-default-fs-matching-(\d{4}\.\d{2}\.\d{2}-)?\d{6}/' }
---
"Do not redirect ingest failure when auto-creating data stream to failure store when enabled by setting but disabled in template":
- do:
index:
index: disabled-fs-matching
refresh: true
pipeline: 'failing_pipeline'
body:
'@timestamp': '2020-12-12'
foo: bar
catch: '/pipeline go boom/'
---
"Do not redirect ingest failure into pre-existing data stream to failure store when enabled by setting but disabled in template":
- do:
indices.create_data_stream:
name: disabled-fs-matching
- do:
index:
index: disabled-fs-matching
refresh: true
pipeline: 'failing_pipeline'
body:
'@timestamp': '2020-12-12'
foo: bar
catch: '/pipeline go boom/'
---
"Redirect mapping failure when auto-creating data stream to failure store when enabled by setting":
- do:
index:
index: default-fs-matching
refresh: true
body:
'@timestamp': 'not a timestamp'
foo: bar
- match: { failure_store: used }
- match: { _index: '/\.fs-default-fs-matching-(\d{4}\.\d{2}\.\d{2}-)?\d{6}/' }
---
"Redirect mapping failure into pre-existing data stream to failure store when enabled by setting":
- do:
indices.create_data_stream:
name: default-fs-matching
- do:
index:
index: default-fs-matching
refresh: true
body:
'@timestamp': 'not a timestamp'
foo: bar
- match: { failure_store: used }
- match: { _index: '/\.fs-default-fs-matching-(\d{4}\.\d{2}\.\d{2}-)?\d{6}/' }
---
"Do not redirect mapping failure when auto-creating data stream to failure store when enabled by setting but disabled in template":
- do:
index:
index: disabled-fs-matching
refresh: true
body:
'@timestamp': 'not a timestamp'
foo: bar
catch: '/failed to parse field/'
---
"Do not redirect mapping failure into pre-existing data stream to failure store when enabled by setting but disabled in template":
- do:
indices.create_data_stream:
name: disabled-fs-matching
- do:
index:
index: disabled-fs-matching
refresh: true
body:
'@timestamp': 'not a timestamp'
foo: bar
catch: '/failed to parse field/'
# See also DataStreamOptionsIT for tests of the interaction between the failure store cluster setting and using
# the /_data_stream/{name}/_options API to explicitly enable and disable the failure store. (At time of writing, these
# can only be done in a Java REST test case, not a YAML one, because the failure store is behind a feature gate and so
# the REST API spec has not been added.)

View file

@ -230,8 +230,6 @@ teardown:
- do:
indices.close:
index: ".ds-simple-data-stream1-*000001,.ds-simple-data-stream1-*000002"
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:

View file

@ -54,8 +54,6 @@ setup:
- do:
indices.close:
index: test_index2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
indices.create:

View file

@ -46,6 +46,7 @@ import org.elasticsearch.legacygeo.XShapeCollection;
import org.elasticsearch.legacygeo.builders.ShapeBuilder;
import org.elasticsearch.legacygeo.parsers.ShapeParser;
import org.elasticsearch.legacygeo.query.LegacyGeoShapeQueryProcessor;
import org.elasticsearch.lucene.spatial.CoordinateEncoder;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.locationtech.spatial4j.shape.Point;
@ -530,6 +531,17 @@ public class LegacyGeoShapeFieldMapper extends AbstractShapeGeometryFieldMapper<
protected Function<List<ShapeBuilder<?, ?, ?>>, List<Object>> getFormatter(String format) {
return GeometryFormatterFactory.getFormatter(format, ShapeBuilder::buildGeometry);
}
@Override
protected boolean isBoundsExtractionSupported() {
// Extracting bounds for geo shapes is not implemented yet.
return false;
}
@Override
protected CoordinateEncoder coordinateEncoder() {
return CoordinateEncoder.GEO;
}
}
private final IndexVersion indexCreatedVersion;

View file

@ -199,7 +199,6 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
action.start();
assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.searchAttempts.get()));
assertBusy(() -> assertTrue(listener.isDone()));
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(ExceptionsHelper.stackTrace(e), containsString(EsRejectedExecutionException.class.getSimpleName()));
assertNull("There shouldn't be a search attempt pending that we didn't reject", client.lastSearch.get());

View file

@ -276,23 +276,17 @@ tests:
- class: org.elasticsearch.xpack.ccr.rest.ShardChangesRestIT
method: testShardChangesNoOperation
issue: https://github.com/elastic/elasticsearch/issues/118800
- class: org.elasticsearch.xpack.security.QueryableReservedRolesIT
method: testDeletingAndCreatingSecurityIndexTriggersSynchronization
issue: https://github.com/elastic/elasticsearch/issues/118806
- class: org.elasticsearch.index.engine.RecoverySourcePruneMergePolicyTests
method: testPruneSome
issue: https://github.com/elastic/elasticsearch/issues/118728
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
method: test {yaml=reference/indices/shard-stores/line_150}
issue: https://github.com/elastic/elasticsearch/issues/118896
- class: org.elasticsearch.cluster.service.MasterServiceTests
method: testThreadContext
issue: https://github.com/elastic/elasticsearch/issues/118914
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
method: test {yaml=indices.create/20_synthetic_source/create index with use_synthetic_source}
issue: https://github.com/elastic/elasticsearch/issues/118955
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.SecureHdfsRepositoryAnalysisRestIT
issue: https://github.com/elastic/elasticsearch/issues/118970
- class: org.elasticsearch.aggregations.bucket.SearchCancellationIT
method: testCancellationDuringTimeSeriesAggregation
issue: https://github.com/elastic/elasticsearch/issues/118992
- class: org.elasticsearch.xpack.security.authc.AuthenticationServiceTests
method: testInvalidToken
issue: https://github.com/elastic/elasticsearch/issues/119019
@ -301,9 +295,16 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/116777
- class: org.elasticsearch.xpack.security.authc.ldap.ActiveDirectoryRunAsIT
issue: https://github.com/elastic/elasticsearch/issues/115727
- class: org.elasticsearch.cluster.coordination.NodeJoinExecutorTests
method: testSuccess
issue: https://github.com/elastic/elasticsearch/issues/119052
- class: org.elasticsearch.xpack.security.authc.kerberos.KerberosAuthenticationIT
issue: https://github.com/elastic/elasticsearch/issues/118414
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlClientYamlIT
issue: https://github.com/elastic/elasticsearch/issues/119086
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
method: test {yaml=reference/search/search-your-data/retrievers-examples/line_98}
issue: https://github.com/elastic/elasticsearch/issues/119155
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
method: testFailureLoadingFields
issue: https://github.com/elastic/elasticsearch/issues/118000
# Examples:
#

View file

@ -49,8 +49,6 @@
- do:
indices.close:
index : test_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
# Restore index
- do:

View file

@ -51,8 +51,6 @@
- do:
indices.close:
index : test_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
# Restore index
- do:

View file

@ -15,6 +15,8 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.TestCaseOrdering;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
@ -28,12 +30,15 @@ import org.junit.rules.TestRule;
import java.util.Comparator;
import java.util.Locale;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.elasticsearch.test.cluster.util.Version.CURRENT;
import static org.elasticsearch.test.cluster.util.Version.fromString;
import static org.elasticsearch.test.rest.ObjectPath.createFromResponse;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
@ -113,6 +118,12 @@ public abstract class AbstractLuceneIndexCompatibilityTestCase extends ESRestTes
return name + '-' + getTestName().split(" ")[0].toLowerCase(Locale.ROOT);
}
protected Settings repositorySettings() {
return Settings.builder()
.put("location", REPOSITORY_PATH.getRoot().toPath().resolve(suffix("location")).toFile().getPath())
.build();
}
protected static Version clusterVersion() throws Exception {
var response = assertOK(client().performRequest(new Request("GET", "/")));
var responseBody = createFromResponse(response);
@ -121,12 +132,56 @@ public abstract class AbstractLuceneIndexCompatibilityTestCase extends ESRestTes
return version;
}
protected static Version indexLuceneVersion(String indexName) throws Exception {
protected static Version indexVersion(String indexName) throws Exception {
var response = assertOK(client().performRequest(new Request("GET", "/" + indexName + "/_settings")));
int id = Integer.parseInt(createFromResponse(response).evaluate(indexName + ".settings.index.version.created"));
return new Version((byte) ((id / 1000000) % 100), (byte) ((id / 10000) % 100), (byte) ((id / 100) % 100));
}
protected static void indexDocs(String indexName, int numDocs) throws Exception {
var request = new Request("POST", "/_bulk");
var docs = new StringBuilder();
IntStream.range(0, numDocs).forEach(n -> docs.append(Strings.format("""
{"index":{"_id":"%s","_index":"%s"}}
{"test":"test"}
""", n, indexName)));
request.setJsonEntity(docs.toString());
var response = assertOK(client().performRequest(request));
assertThat(entityAsMap(response).get("errors"), allOf(notNullValue(), is(false)));
}
protected static void mountIndex(String repository, String snapshot, String indexName, boolean partial, String renamedIndexName)
throws Exception {
var request = new Request("POST", "/_snapshot/" + repository + "/" + snapshot + "/_mount");
request.addParameter("wait_for_completion", "true");
var storage = partial ? "shared_cache" : "full_copy";
request.addParameter("storage", storage);
request.setJsonEntity(Strings.format("""
{
"index": "%s",
"renamed_index": "%s"
}""", indexName, renamedIndexName));
var responseBody = createFromResponse(client().performRequest(request));
assertThat(responseBody.evaluate("snapshot.shards.total"), equalTo((int) responseBody.evaluate("snapshot.shards.successful")));
assertThat(responseBody.evaluate("snapshot.shards.failed"), equalTo(0));
}
protected static void restoreIndex(String repository, String snapshot, String indexName, String renamedIndexName) throws Exception {
var request = new Request("POST", "/_snapshot/" + repository + "/" + snapshot + "/_restore");
request.addParameter("wait_for_completion", "true");
request.setJsonEntity(org.elasticsearch.common.Strings.format("""
{
"indices": "%s",
"include_global_state": false,
"rename_pattern": "(.+)",
"rename_replacement": "%s",
"include_aliases": false
}""", indexName, renamedIndexName));
var responseBody = createFromResponse(client().performRequest(request));
assertThat(responseBody.evaluate("snapshot.shards.total"), equalTo((int) responseBody.evaluate("snapshot.shards.failed")));
assertThat(responseBody.evaluate("snapshot.shards.successful"), equalTo(0));
}
/**
* Execute the test suite with the parameters provided by the {@link #parameters()} in version order.
*/

View file

@ -10,20 +10,18 @@
package org.elasticsearch.lucene;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.cluster.util.Version;
import java.util.stream.IntStream;
import static org.elasticsearch.test.rest.ObjectPath.createFromResponse;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class LuceneCompatibilityIT extends AbstractLuceneIndexCompatibilityTestCase {
@ -35,22 +33,19 @@ public class LuceneCompatibilityIT extends AbstractLuceneIndexCompatibilityTestC
super(version);
}
/**
* Creates an index and a snapshot on N-2, then restores the snapshot on N.
*/
public void testRestoreIndex() throws Exception {
final String repository = suffix("repository");
final String snapshot = suffix("snapshot");
final String index = suffix("index");
final int numDocs = 1234;
logger.debug("--> registering repository [{}]", repository);
registerRepository(
client(),
repository,
FsRepository.TYPE,
true,
Settings.builder().put("location", REPOSITORY_PATH.getRoot().getPath()).build()
);
if (VERSION_MINUS_2.equals(clusterVersion())) {
logger.debug("--> registering repository [{}]", repository);
registerRepository(client(), repository, FsRepository.TYPE, true, repositorySettings());
logger.debug("--> creating index [{}]", index);
createIndex(
client(),
@ -63,17 +58,7 @@ public class LuceneCompatibilityIT extends AbstractLuceneIndexCompatibilityTestC
);
logger.debug("--> indexing [{}] docs in [{}]", numDocs, index);
final var bulks = new StringBuilder();
IntStream.range(0, numDocs).forEach(n -> bulks.append(Strings.format("""
{"index":{"_id":"%s","_index":"%s"}}
{"test":"test"}
""", n, index)));
var bulkRequest = new Request("POST", "/_bulk");
bulkRequest.setJsonEntity(bulks.toString());
var bulkResponse = client().performRequest(bulkRequest);
assertOK(bulkResponse);
assertThat(entityAsMap(bulkResponse).get("errors"), allOf(notNullValue(), is(false)));
indexDocs(index, numDocs);
logger.debug("--> creating snapshot [{}]", snapshot);
createSnapshot(client(), repository, snapshot, true);
@ -83,7 +68,7 @@ public class LuceneCompatibilityIT extends AbstractLuceneIndexCompatibilityTestC
if (VERSION_MINUS_1.equals(clusterVersion())) {
ensureGreen(index);
assertThat(indexLuceneVersion(index), equalTo(VERSION_MINUS_2));
assertThat(indexVersion(index), equalTo(VERSION_MINUS_2));
assertDocCount(client(), index, numDocs);
logger.debug("--> deleting index [{}]", index);
@ -93,9 +78,9 @@ public class LuceneCompatibilityIT extends AbstractLuceneIndexCompatibilityTestC
if (VERSION_CURRENT.equals(clusterVersion())) {
var restoredIndex = suffix("index-restored");
logger.debug("--> restoring index [{}] as archive [{}]", index, restoredIndex);
logger.debug("--> restoring index [{}] as [{}]", index, restoredIndex);
// Restoring the archive will fail as Elasticsearch does not support reading N-2 yet
// Restoring the index will fail as Elasticsearch does not support reading N-2 yet
var request = new Request("POST", "/_snapshot/" + repository + "/" + snapshot + "/_restore");
request.addParameter("wait_for_completion", "true");
request.setJsonEntity(Strings.format("""
@ -106,9 +91,20 @@ public class LuceneCompatibilityIT extends AbstractLuceneIndexCompatibilityTestC
"rename_replacement": "%s",
"include_aliases": false
}""", index, restoredIndex));
var responseBody = createFromResponse(client().performRequest(request));
assertThat(responseBody.evaluate("snapshot.shards.total"), equalTo((int) responseBody.evaluate("snapshot.shards.failed")));
assertThat(responseBody.evaluate("snapshot.shards.successful"), equalTo(0));
var responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertEquals(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), responseException.getResponse().getStatusLine().getStatusCode());
assertThat(
responseException.getMessage(),
allOf(
containsString("cannot restore index [[" + index),
containsString("because it cannot be upgraded"),
containsString("has current compatibility version [" + VERSION_MINUS_2 + '-' + VERSION_MINUS_1.getMajor() + ".0.0]"),
containsString("but the minimum compatible version is [" + VERSION_MINUS_1.getMajor() + ".0.0]."),
containsString("It should be re-indexed in Elasticsearch " + VERSION_MINUS_1.getMajor() + ".x"),
containsString("before upgrading to " + VERSION_CURRENT)
)
);
}
}
}

View file

@ -9,21 +9,13 @@
package org.elasticsearch.lucene;
import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.test.cluster.util.Version;
import java.util.stream.IntStream;
import static org.elasticsearch.test.rest.ObjectPath.createFromResponse;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class SearchableSnapshotCompatibilityIT extends AbstractLuceneIndexCompatibilityTestCase {
@ -37,24 +29,19 @@ public class SearchableSnapshotCompatibilityIT extends AbstractLuceneIndexCompat
super(version);
}
// TODO Add a test to mount the N-2 index on N-1 and then search it on N
/**
* Creates an index and a snapshot on N-2, then mounts the snapshot on N.
*/
public void testSearchableSnapshot() throws Exception {
final String repository = suffix("repository");
final String snapshot = suffix("snapshot");
final String index = suffix("index");
final int numDocs = 1234;
logger.debug("--> registering repository [{}]", repository);
registerRepository(
client(),
repository,
FsRepository.TYPE,
true,
Settings.builder().put("location", REPOSITORY_PATH.getRoot().getPath()).build()
);
if (VERSION_MINUS_2.equals(clusterVersion())) {
logger.debug("--> registering repository [{}]", repository);
registerRepository(client(), repository, FsRepository.TYPE, true, repositorySettings());
logger.debug("--> creating index [{}]", index);
createIndex(
client(),
@ -67,17 +54,7 @@ public class SearchableSnapshotCompatibilityIT extends AbstractLuceneIndexCompat
);
logger.debug("--> indexing [{}] docs in [{}]", numDocs, index);
final var bulks = new StringBuilder();
IntStream.range(0, numDocs).forEach(n -> bulks.append(Strings.format("""
{"index":{"_id":"%s","_index":"%s"}}
{"test":"test"}
""", n, index)));
var bulkRequest = new Request("POST", "/_bulk");
bulkRequest.setJsonEntity(bulks.toString());
var bulkResponse = client().performRequest(bulkRequest);
assertOK(bulkResponse);
assertThat(entityAsMap(bulkResponse).get("errors"), allOf(notNullValue(), is(false)));
indexDocs(index, numDocs);
logger.debug("--> creating snapshot [{}]", snapshot);
createSnapshot(client(), repository, snapshot, true);
@ -87,7 +64,7 @@ public class SearchableSnapshotCompatibilityIT extends AbstractLuceneIndexCompat
if (VERSION_MINUS_1.equals(clusterVersion())) {
ensureGreen(index);
assertThat(indexLuceneVersion(index), equalTo(VERSION_MINUS_2));
assertThat(indexVersion(index), equalTo(VERSION_MINUS_2));
assertDocCount(client(), index, numDocs);
logger.debug("--> deleting index [{}]", index);
@ -98,20 +75,75 @@ public class SearchableSnapshotCompatibilityIT extends AbstractLuceneIndexCompat
if (VERSION_CURRENT.equals(clusterVersion())) {
var mountedIndex = suffix("index-mounted");
logger.debug("--> mounting index [{}] as [{}]", index, mountedIndex);
mountIndex(repository, snapshot, index, randomBoolean(), mountedIndex);
// Mounting the index will fail as Elasticsearch does not support reading N-2 yet
var request = new Request("POST", "/_snapshot/" + repository + "/" + snapshot + "/_mount");
request.addParameter("wait_for_completion", "true");
var storage = randomBoolean() ? "shared_cache" : "full_copy";
request.addParameter("storage", storage);
request.setJsonEntity(Strings.format("""
{
"index": "%s",
"renamed_index": "%s"
}""", index, mountedIndex));
var responseBody = createFromResponse(client().performRequest(request));
assertThat(responseBody.evaluate("snapshot.shards.total"), equalTo((int) responseBody.evaluate("snapshot.shards.failed")));
assertThat(responseBody.evaluate("snapshot.shards.successful"), equalTo(0));
ensureGreen(mountedIndex);
assertThat(indexVersion(mountedIndex), equalTo(VERSION_MINUS_2));
assertDocCount(client(), mountedIndex, numDocs);
logger.debug("--> adding replica to test peer-recovery");
updateIndexSettings(mountedIndex, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1));
ensureGreen(mountedIndex);
}
}
/**
* Creates an index and a snapshot on N-2, mounts the snapshot on N -1 and then upgrades to N.
*/
public void testSearchableSnapshotUpgrade() throws Exception {
final String mountedIndex = suffix("index-mounted");
final String repository = suffix("repository");
final String snapshot = suffix("snapshot");
final String index = suffix("index");
final int numDocs = 4321;
if (VERSION_MINUS_2.equals(clusterVersion())) {
logger.debug("--> registering repository [{}]", repository);
registerRepository(client(), repository, FsRepository.TYPE, true, repositorySettings());
logger.debug("--> creating index [{}]", index);
createIndex(
client(),
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build()
);
logger.debug("--> indexing [{}] docs in [{}]", numDocs, index);
indexDocs(index, numDocs);
logger.debug("--> creating snapshot [{}]", snapshot);
createSnapshot(client(), repository, snapshot, true);
logger.debug("--> deleting index [{}]", index);
deleteIndex(index);
return;
}
if (VERSION_MINUS_1.equals(clusterVersion())) {
logger.debug("--> mounting index [{}] as [{}]", index, mountedIndex);
mountIndex(repository, snapshot, index, randomBoolean(), mountedIndex);
ensureGreen(mountedIndex);
assertThat(indexVersion(mountedIndex), equalTo(VERSION_MINUS_2));
assertDocCount(client(), mountedIndex, numDocs);
return;
}
if (VERSION_CURRENT.equals(clusterVersion())) {
ensureGreen(mountedIndex);
assertThat(indexVersion(mountedIndex), equalTo(VERSION_MINUS_2));
assertDocCount(client(), mountedIndex, numDocs);
logger.debug("--> adding replica to test peer-recovery");
updateIndexSettings(mountedIndex, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1));
ensureGreen(mountedIndex);
}
}
}

View file

@ -59,6 +59,15 @@ tasks.named("yamlRestCompatTestTransform").configure ({ task ->
task.replaceValueInMatch("profile.shards.0.dfs.knn.0.query.0.description", "DocAndScoreQuery[0,...][0.009673266,...],0.009673266", "dfs knn vector profiling with vector_operations_count")
task.skipTest("cat.aliases/10_basic/Deprecated local parameter", "CAT APIs not covered by compatibility policy")
task.skipTest("cat.shards/10_basic/Help", "sync_id is removed in 9.0")
task.skipTest("tsdb/20_mapping/exact match object type", "skip until pr/116687 gets backported")
task.skipTest("tsdb/25_id_generation/delete over _bulk", "skip until pr/116687 gets backported")
task.skipTest("tsdb/80_index_resize/split", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/noop update", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/regular update", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/search with routing", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/index with routing over _bulk", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/update over _bulk", "skip until pr/116687 gets backported")
task.skipTest("tsdb/90_unsupported_operations/index with routing", "skip until pr/116687 gets backported")
task.skipTest("search/500_date_range/from, to, include_lower, include_upper deprecated", "deprecated parameters are removed in 9.0")
task.skipTest("tsdb/20_mapping/stored source is supported", "no longer serialize source_mode")
task.skipTest("tsdb/20_mapping/Synthetic source", "no longer serialize source_mode")

View file

@ -388,8 +388,6 @@
- do:
indices.close:
index: test_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
cat.aliases:
@ -425,8 +423,6 @@
- do:
indices.close:
index: test_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
cat.aliases: {}

View file

@ -87,8 +87,6 @@
- do:
indices.close:
index: index-2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:
@ -132,8 +130,6 @@
- do:
indices.close:
index: index-2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:
@ -278,8 +274,6 @@
- do:
indices.close:
index: bar
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
cat.indices:

View file

@ -97,8 +97,6 @@
- do:
indices.close:
index: index2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:

View file

@ -97,8 +97,6 @@
- do:
indices.close:
index: index1
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
catch: bad_request

View file

@ -69,8 +69,6 @@
- do:
indices.close:
index: test_closed
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- match: { acknowledged: true }

View file

@ -219,8 +219,6 @@
- do:
indices.close:
index: index-2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
# closing the index-2 turns the cluster health back to green
@ -297,8 +295,6 @@
- do:
indices.close:
index: index-2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:

View file

@ -25,8 +25,6 @@ setup:
- do:
indices.close:
index: index-2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
cluster.health:

View file

@ -27,8 +27,6 @@ setup:
- do:
indices.close:
index: test_close_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
---
"Test expand_wildcards parameter on closed, open indices and both":

View file

@ -2036,14 +2036,12 @@ create index with use_synthetic_source:
- is_true: test.settings.index.recovery.use_synthetic_source
- do:
bulk:
index:
index: test
id: 1
refresh: true
body:
- '{ "create": { } }'
- '{ "field": "aaaa" }'
- '{ "create": { } }'
- '{ "field": "bbbb" }'
body: { foo: bar }
- match: { _version: 1 }
- do:
indices.disk_usage:

View file

@ -40,8 +40,6 @@ setup:
- do:
indices.close:
index: test_index_3
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
cluster.health:

View file

@ -297,8 +297,6 @@ setup:
- do:
indices.close:
index: test_index_2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
indices.get_alias:

View file

@ -55,8 +55,6 @@ setup:
- do:
indices.close:
index: test-xyy
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
cluster.health:

View file

@ -15,8 +15,6 @@
- do:
indices.close:
index: test_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:
@ -55,8 +53,6 @@
- do:
indices.close:
index: test_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:
@ -113,8 +109,6 @@
- do:
indices.close:
index: "index_*"
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- match: { acknowledged: true }
- match: { shards_acknowledged: true }
@ -123,11 +117,14 @@
- match: { indices.index_3.closed: true }
---
"?wait_for_active_shards=index-setting is deprecated":
"?wait_for_active_shards=index-setting is removed":
- requires:
cluster_features: ["gte_v8.0.0"]
reason: "required deprecation warning is only emitted in 8.0 and later"
test_runner_features: ["warnings"]
reason: "Parameter value 'index-setting' of 'wait-for-active-shards' is rejected with specialised error."
test_runner_features: [capabilities]
capabilities:
- method: POST
path: /{index}/_close
capabilities: [ wait-for-active-shards-index-setting-removed ]
- do:
indices.create:
@ -137,8 +134,7 @@
number_of_replicas: 0
- do:
catch: /The 'index-setting' value for parameter 'wait_for_active_shards' is the default behaviour and this configuration value is not supported anymore. Please remove 'wait_for_active_shards=index-setting'/
indices.close:
index: "index_*"
wait_for_active_shards: index-setting
warnings:
- "?wait_for_active_shards=index-setting is now the default behaviour; the 'index-setting' value for this parameter should no longer be used since it will become unsupported in version 9"

View file

@ -24,8 +24,6 @@ setup:
- do:
indices.close:
index: _all
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:
@ -57,8 +55,6 @@ setup:
- do:
indices.close:
index: test_*
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:
@ -90,8 +86,6 @@ setup:
- do:
indices.close:
index: '*'
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:

View file

@ -92,8 +92,6 @@ setup:
- do:
indices.close:
index: test-index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
indices.put_settings:

View file

@ -59,8 +59,6 @@
- do:
indices.close:
index: test_2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- is_true: acknowledged
- do:

View file

@ -0,0 +1,33 @@
---
test recovery empty index with use_synthetic_source:
- requires:
cluster_features: ["mapper.synthetic_recovery_source"]
reason: requires synthetic recovery source
- do:
indices.create:
index: test
body:
settings:
index:
number_of_replicas: 0
recovery:
use_synthetic_source: true
mapping:
source:
mode: synthetic
- do:
indices.get_settings: {}
- match: { test.settings.index.mapping.source.mode: synthetic}
- is_true: test.settings.index.recovery.use_synthetic_source
- do:
indices.put_settings:
index: test
body:
index.number_of_replicas: 1
- do:
cluster.health:
wait_for_events: languid

View file

@ -20,8 +20,6 @@ setup:
- do:
indices.close:
index: index2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
---
"Resolve cluster with indices and aliases":

View file

@ -24,8 +24,6 @@ setup:
- do:
indices.close:
index: test_index2
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
indices.create:

View file

@ -105,8 +105,6 @@
- do:
indices.close:
index: index1
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
catch: bad_request

View file

@ -513,6 +513,48 @@ routing path not allowed in logs mode:
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "[index.routing_path] requires [index.mode=time_series]" }
---
routing path allowed in logs mode with routing on sort fields:
- requires:
cluster_features: [ "routing.logsb_route_on_sort_fields" ]
reason: introduction of route on index sorting fields
- do:
indices.create:
index: test
body:
settings:
index:
mode: logsdb
number_of_replicas: 0
number_of_shards: 2
routing_path: [ host.name, agent_id ]
logsdb:
route_on_sort_fields: true
mappings:
properties:
"@timestamp":
type: date
host.name:
type: keyword
agent_id:
type: keyword
process_id:
type: integer
http_method:
type: keyword
message:
type: text
- do:
indices.get_settings:
index: test
- is_true: test
- match: { test.settings.index.mode: logsdb }
- match: { test.settings.index.logsdb.route_on_sort_fields: "true" }
- match: { test.settings.index.routing_path: [ host.name, agent_id ] }
---
start time not allowed in logs mode:
- requires:

View file

@ -37,8 +37,6 @@
- do:
indices.close:
index: index_closed
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
catch: /index_closed_exception/

View file

@ -46,8 +46,6 @@ setup:
- do:
indices.close:
index : test_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
snapshot.restore:
@ -92,8 +90,6 @@ setup:
- do:
indices.close:
index : test_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
- do:
snapshot.restore:

View file

@ -127,7 +127,7 @@ exact match object type:
reason: routing_path error message updated in 8.14.0
- do:
catch: '/All fields that match routing_path must be configured with \[time_series_dimension: true\] or flattened fields with a list of dimensions in \[time_series_dimensions\] and without the \[script\] parameter. \[dim\] was \[object\]./'
catch: '/All fields that match routing_path must be .*flattened fields.* \[dim\] was \[object\]./'
indices.create:
index: tsdb_index
body:

View file

@ -427,7 +427,7 @@ delete over _bulk:
- match: {items.0.delete.result: deleted}
- match: {items.1.delete.result: deleted}
- match: {items.2.delete.status: 404}
- match: {items.2.delete.error.reason: "invalid id [not found ++ not found] for index [id_generation_test] in time series mode"}
- match: {items.2.delete.error.reason: '/invalid\ id\ \[not\ found\ \+\+\ not\ found\]\ for\ index\ \[id_generation_test\]\ in\ time.series\ mode/'}
---
routing_path matches deep object:

View file

@ -120,8 +120,6 @@ teardown:
- do:
indices.close:
index : test_index
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
# Restore index
- do:

View file

@ -95,7 +95,7 @@ split:
reason: tsdb indexing changed in 8.2.0
- do:
catch: /index-split is not supported because the destination index \[test\] is in time series mode/
catch: /index-split is not supported because the destination index \[test\] is in time.series mode/
indices.split:
index: test
target: test_split

View file

@ -75,7 +75,7 @@ index with routing:
reason: tsdb indexing changed in 8.2.0
- do:
catch: /specifying routing is not supported because the destination index \[test\] is in time series mode/
catch: /specifying routing is not supported because the destination index \[test\] is in time.series mode/
index:
index: test
routing: foo
@ -104,7 +104,7 @@ index with routing over _bulk:
body:
- '{"index": {"routing": "foo"}}'
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- match: {items.0.index.error.reason: "specifying routing is not supported because the destination index [test] is in time series mode"}
- match: {items.0.index.error.reason: '/specifying\ routing\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'}
---
noop update:
@ -120,7 +120,7 @@ noop update:
- length: {hits.hits: 1}
- do:
catch: /update is not supported because the destination index \[test\] is in time series mode/
catch: /update is not supported because the destination index \[test\] is in time.series mode/
update:
index: test
id: "1"
@ -136,7 +136,7 @@ regular update:
# We fail even though the document isn't found.
- do:
catch: /update is not supported because the destination index \[test\] is in time series mode/
catch: /update is not supported because the destination index \[test\] is in time.series mode/
update:
index: test
id: "1"
@ -165,7 +165,7 @@ update over _bulk:
body:
- '{"update": {"_id": 1}}'
- '{"doc":{"@timestamp": "2021-04-28T18:03:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}}'
- match: {items.0.update.error.reason: "update is not supported because the destination index [test] is in time series mode"}
- match: {items.0.update.error.reason: '/update\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'}
---
search with routing:
@ -175,7 +175,7 @@ search with routing:
# We fail even though the document isn't found.
- do:
catch: /searching with a specified routing is not supported because the destination index \[test\] is in time series mode/
catch: /searching with a specified routing is not supported because the destination index \[test\] is in time.series mode/
search:
index: test
routing: rrrr

View file

@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheReque
import org.elasticsearch.action.admin.indices.cache.clear.TransportClearIndicesCacheAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@ -157,9 +158,11 @@ public class IndicesRequestIT extends ESIntegTestCase {
for (int i = 0; i < numIndices; i++) {
indices.add("test" + i);
}
for (String index : indices) {
assertAcked(prepareCreate(index).addAlias(new Alias(index + "-alias")));
}
assertAcked(
indices.stream()
.map(index -> prepareCreate(index).addAlias(new Alias(index + "-alias")))
.toArray(CreateIndexRequestBuilder[]::new)
);
ensureGreen();
}

View file

@ -252,11 +252,13 @@ public class CancellableTasksIT extends ESIntegTestCase {
if (waitForCompletion) {
assertFalse(cancelFuture.isDone());
} else {
assertBusy(() -> assertTrue(cancelFuture.isDone()));
cancelFuture.get();
}
allowEntireRequest(rootRequest);
waitForRootTask(mainTaskFuture, false);
if (waitForCompletion) {
cancelFuture.actionGet();
}
ensureBansAndCancellationsConsistency();
}

View file

@ -158,8 +158,7 @@ public class CreateIndexIT extends ESIntegTestCase {
}
public void testTwoEmptyEqualMappings() throws Exception {
assertAcked(prepareCreate("test1"));
assertAcked(prepareCreate("test2").setMapping(XContentFactory.jsonBuilder().startObject().endObject()));
assertAcked(prepareCreate("test1"), prepareCreate("test2").setMapping(XContentFactory.jsonBuilder().startObject().endObject()));
FieldCapabilitiesRequest fieldCapsReq1 = new FieldCapabilitiesRequest();
fieldCapsReq1.indices("test1");
fieldCapsReq1.fields("*");

View file

@ -682,9 +682,11 @@ public class RolloverIT extends ESIntegTestCase {
final String openNonwriteIndex = "open-index-nonwrite";
final String closedIndex = "closed-index-nonwrite";
final String writeIndexPrefix = "write-index-";
assertAcked(prepareCreate(openNonwriteIndex).addAlias(new Alias(aliasName)).get());
assertAcked(prepareCreate(closedIndex).addAlias(new Alias(aliasName)).get());
assertAcked(prepareCreate(writeIndexPrefix + "000001").addAlias(new Alias(aliasName).writeIndex(true)).get());
assertAcked(
prepareCreate(openNonwriteIndex).addAlias(new Alias(aliasName)),
prepareCreate(closedIndex).addAlias(new Alias(aliasName)),
prepareCreate(writeIndexPrefix + "000001").addAlias(new Alias(aliasName).writeIndex(true))
);
ensureGreen();
index(closedIndex, null, "{\"foo\": \"bar\"}");
@ -707,17 +709,18 @@ public class RolloverIT extends ESIntegTestCase {
final String openNonwriteIndex = "open-index-nonwrite";
final String closedIndex = "closed-index-nonwrite";
final String writeIndexPrefix = "write-index-";
assertAcked(prepareCreate(openNonwriteIndex).addAlias(new Alias(aliasName)).get());
assertAcked(prepareCreate(closedIndex).addAlias(new Alias(aliasName)).get());
assertAcked(prepareCreate(writeIndexPrefix + "000001").addAlias(new Alias(aliasName).writeIndex(true)).get());
assertAcked(
prepareCreate(openNonwriteIndex).addAlias(new Alias(aliasName)),
prepareCreate(closedIndex).addAlias(new Alias(aliasName)),
prepareCreate(writeIndexPrefix + "000001").addAlias(new Alias(aliasName).writeIndex(true))
);
ensureGreen(openNonwriteIndex, closedIndex, writeIndexPrefix + "000001");
index(closedIndex, null, "{\"foo\": \"bar\"}");
index(aliasName, null, "{\"foo\": \"bar\"}");
index(aliasName, null, "{\"foo\": \"bar\"}");
refresh(aliasName);
assertAcked(indicesAdmin().prepareClose(closedIndex).get());
assertAcked(indicesAdmin().prepareClose(writeIndexPrefix + "000001").get());
assertAcked(indicesAdmin().prepareClose(closedIndex, writeIndexPrefix + "000001").get());
ensureGreen(aliasName);
RolloverResponse rolloverResponse = indicesAdmin().prepareRolloverIndex(aliasName)

View file

@ -307,11 +307,15 @@ public class TransportSearchIT extends ESIntegTestCase {
public void testWaitForRefreshIndexValidation() throws Exception {
int numberOfShards = randomIntBetween(3, 10);
assertAcked(prepareCreate("test1").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
assertAcked(prepareCreate("test2").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
assertAcked(prepareCreate("test3").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
indicesAdmin().prepareAliases().addAlias("test1", "testAlias").get();
indicesAdmin().prepareAliases().addAlias(new String[] { "test2", "test3" }, "testFailedAlias").get();
assertAcked(
prepareCreate("test1").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)),
prepareCreate("test2").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)),
prepareCreate("test3").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards))
);
assertAcked(
indicesAdmin().prepareAliases().addAlias("test1", "testAlias"),
indicesAdmin().prepareAliases().addAlias(new String[] { "test2", "test3" }, "testFailedAlias")
);
long[] validCheckpoints = new long[numberOfShards];
Arrays.fill(validCheckpoints, SequenceNumbers.UNASSIGNED_SEQ_NO);
@ -376,8 +380,10 @@ public class TransportSearchIT extends ESIntegTestCase {
try {
final int numPrimaries1 = randomIntBetween(2, 10);
final int numPrimaries2 = randomIntBetween(1, 10);
assertAcked(prepareCreate("test1").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numPrimaries1)));
assertAcked(prepareCreate("test2").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numPrimaries2)));
assertAcked(
prepareCreate("test1").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numPrimaries1)),
prepareCreate("test2").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numPrimaries2))
);
// no exception
prepareSearch("test1").get().decRef();

View file

@ -531,8 +531,10 @@ public class GetTermVectorsIT extends AbstractTermVectorsTestCase {
public void testDuelWithAndWithoutTermVectors() throws IOException, ExecutionException, InterruptedException {
// setup indices
String[] indexNames = new String[] { "with_tv", "without_tv" };
assertAcked(prepareCreate(indexNames[0]).setMapping("field1", "type=text,term_vector=with_positions_offsets,analyzer=keyword"));
assertAcked(prepareCreate(indexNames[1]).setMapping("field1", "type=text,term_vector=no,analyzer=keyword"));
assertAcked(
prepareCreate(indexNames[1]).setMapping("field1", "type=text,term_vector=no,analyzer=keyword"),
prepareCreate(indexNames[0]).setMapping("field1", "type=text,term_vector=with_positions_offsets,analyzer=keyword")
);
ensureGreen();
// index documents with and without term vectors
@ -1074,9 +1076,7 @@ public class GetTermVectorsIT extends AbstractTermVectorsTestCase {
"type=text,term_vector=with_positions_offsets,analyzer=my_analyzer",
"field2",
"type=text,term_vector=with_positions_offsets,analyzer=keyword"
)
);
assertAcked(
),
prepareCreate(indexNames[1]).setSettings(builder.build())
.setMapping("field1", "type=keyword,normalizer=my_normalizer", "field2", "type=keyword")
);

View file

@ -323,10 +323,8 @@ public class IndexAliasesIT extends ESIntegTestCase {
}
public void testSearchingFilteringAliasesTwoIndices() throws Exception {
logger.info("--> creating index [test1]");
assertAcked(prepareCreate("test1").setMapping("name", "type=text"));
logger.info("--> creating index [test2]");
assertAcked(prepareCreate("test2").setMapping("name", "type=text"));
logger.info("--> creating indices [test1, test2]");
assertAcked(prepareCreate("test1").setMapping("name", "type=text"), prepareCreate("test2").setMapping("name", "type=text"));
ensureGreen();
logger.info("--> adding filtering aliases to index [test1]");
@ -525,8 +523,7 @@ public class IndexAliasesIT extends ESIntegTestCase {
public void testDeletingByQueryFilteringAliases() throws Exception {
logger.info("--> creating index [test1] and [test2");
assertAcked(prepareCreate("test1").setMapping("name", "type=text"));
assertAcked(prepareCreate("test2").setMapping("name", "type=text"));
assertAcked(prepareCreate("test1").setMapping("name", "type=text"), prepareCreate("test2").setMapping("name", "type=text"));
ensureGreen();
logger.info("--> adding filtering aliases to index [test1]");
@ -580,8 +577,7 @@ public class IndexAliasesIT extends ESIntegTestCase {
public void testDeleteAliases() throws Exception {
logger.info("--> creating index [test1] and [test2]");
assertAcked(prepareCreate("test1").setMapping("name", "type=text"));
assertAcked(prepareCreate("test2").setMapping("name", "type=text"));
assertAcked(prepareCreate("test1").setMapping("name", "type=text"), prepareCreate("test2").setMapping("name", "type=text"));
ensureGreen();
logger.info("--> adding filtering aliases to index [test1]");
@ -619,8 +615,7 @@ public class IndexAliasesIT extends ESIntegTestCase {
}
logger.info("--> creating index [foo_foo] and [bar_bar]");
assertAcked(prepareCreate("foo_foo"));
assertAcked(prepareCreate("bar_bar"));
assertAcked(prepareCreate("foo_foo"), prepareCreate("bar_bar"));
ensureGreen();
logger.info("--> adding [foo] alias to [foo_foo] and [bar_bar]");
@ -1163,13 +1158,15 @@ public class IndexAliasesIT extends ESIntegTestCase {
}
}
public void testAliasActionRemoveIndex() throws InterruptedException, ExecutionException {
assertAcked(prepareCreate("foo_foo"));
assertAcked(prepareCreate("bar_bar"));
assertAliasesVersionIncreases(new String[] { "foo_foo", "bar_bar" }, () -> {
assertAcked(indicesAdmin().prepareAliases().addAlias("foo_foo", "foo"));
assertAcked(indicesAdmin().prepareAliases().addAlias("bar_bar", "foo"));
});
public void testAliasActionRemoveIndex() {
assertAcked(prepareCreate("foo_foo"), prepareCreate("bar_bar"));
assertAliasesVersionIncreases(
new String[] { "foo_foo", "bar_bar" },
() -> assertAcked(
indicesAdmin().prepareAliases().addAlias("bar_bar", "foo"),
indicesAdmin().prepareAliases().addAlias("foo_foo", "foo")
)
);
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, indicesAdmin().prepareAliases().removeIndex("foo"));
assertEquals(

View file

@ -39,8 +39,10 @@ public class UpdateShardAllocationSettingsIT extends ESIntegTestCase {
// we test with 2 shards since otherwise it's pretty fragile if there are difference in the num or shards such that
// all shards are relocated to the second node which is not what we want here. It's solely a test for the settings to take effect
final int numShards = 2;
assertAcked(prepareCreate("test").setSettings(indexSettings(numShards, 0)));
assertAcked(prepareCreate("test_1").setSettings(indexSettings(numShards, 0)));
assertAcked(
prepareCreate("test").setSettings(indexSettings(numShards, 0)),
prepareCreate("test_1").setSettings(indexSettings(numShards, 0))
);
ensureGreen();
assertAllShardsOnNodes("test", firstNode);
assertAllShardsOnNodes("test_1", firstNode);

View file

@ -321,9 +321,11 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
public void testQueryRewriteDatesWithNow() throws Exception {
Client client = client();
Settings settings = indexSettings(1, 0).put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build();
assertAcked(indicesAdmin().prepareCreate("index-1").setMapping("d", "type=date").setSettings(settings).get());
assertAcked(indicesAdmin().prepareCreate("index-2").setMapping("d", "type=date").setSettings(settings).get());
assertAcked(indicesAdmin().prepareCreate("index-3").setMapping("d", "type=date").setSettings(settings).get());
assertAcked(
indicesAdmin().prepareCreate("index-1").setMapping("d", "type=date").setSettings(settings),
indicesAdmin().prepareCreate("index-2").setMapping("d", "type=date").setSettings(settings),
indicesAdmin().prepareCreate("index-3").setMapping("d", "type=date").setSettings(settings)
);
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
DateFormatter formatter = DateFormatter.forPattern("strict_date_optional_time");
indexRandom(

View file

@ -81,9 +81,10 @@ public class SimpleGetFieldMappingsIT extends ESIntegTestCase {
}
public void testGetFieldMappings() throws Exception {
assertAcked(prepareCreate("indexa").setMapping(getMappingForType()));
assertAcked(indicesAdmin().prepareCreate("indexb").setMapping(getMappingForType()));
assertAcked(
prepareCreate("indexa").setMapping(getMappingForType()),
indicesAdmin().prepareCreate("indexb").setMapping(getMappingForType())
);
// Get mappings by full name
GetFieldMappingsResponse response = indicesAdmin().prepareGetFieldMappings("indexa").setFields("field1", "obj.subfield").get();

View file

@ -336,8 +336,10 @@ public class NodeIndexingMetricsIT extends ESIntegTestCase {
plugin.resetMeter();
final int numberOfShards = randomIntBetween(1, 5);
assertAcked(prepareCreate("test-one", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)).get());
assertAcked(prepareCreate("test-two", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)).get());
assertAcked(
prepareCreate("test-one", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)),
prepareCreate("test-two", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
);
final BulkRequest bulkRequestOne = new BulkRequest();
final int batchCountOne = randomIntBetween(50, 100);
@ -397,8 +399,10 @@ public class NodeIndexingMetricsIT extends ESIntegTestCase {
ensureStableCluster(2);
// for simplicity do not mix small and big documents in single index/shard
assertAcked(prepareCreate("test-index-one", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)).get());
assertAcked(prepareCreate("test-index-two", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)).get());
assertAcked(
prepareCreate("test-index-one", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)),
prepareCreate("test-index-two", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
);
final TestTelemetryPlugin plugin = internalCluster().getInstance(PluginsService.class, dataNode)
.filterPlugins(TestTelemetryPlugin.class)

View file

@ -119,10 +119,10 @@ public class TopHitsIT extends ESIntegTestCase {
@Override
public void setupSuiteScopeCluster() throws Exception {
assertAcked(prepareCreate("idx").setMapping(TERMS_AGGS_FIELD, "type=keyword", "text", "type=text,store=true"));
assertAcked(prepareCreate("field-collapsing").setMapping("group", "type=keyword"));
createIndex("empty");
assertAcked(
prepareCreate("idx").setMapping(TERMS_AGGS_FIELD, "type=keyword", "text", "type=text,store=true"),
prepareCreate("field-collapsing").setMapping("group", "type=keyword"),
prepareCreate("empty"),
prepareCreate("articles").setMapping(
jsonBuilder().startObject()
.startObject("_doc")

View file

@ -154,8 +154,10 @@ public class QueryRewriteContextIT extends ESIntegTestCase {
public void testResolvedIndices_TransportExplainAction() {
final String[] indices = { "test1", "test2" };
createIndex(indices);
assertAcked(indicesAdmin().prepareAliases().addAlias("test1", "alias1"));
assertAcked(indicesAdmin().prepareAliases().addAlias(indices, "alias2"));
assertAcked(
indicesAdmin().prepareAliases().addAlias("test1", "alias1"),
indicesAdmin().prepareAliases().addAlias(indices, "alias2")
);
assertResolvedIndices(client().prepareExplain("test1", "1"), Set.of("test1"), Set.of("test1"), r -> {});
assertResolvedIndices(client().prepareExplain("alias1", "1"), Set.of("alias1"), Set.of("test1"), r -> {});

View file

@ -183,7 +183,11 @@ public class CrossClusterIT extends AbstractMultiClustersTestCase {
}
}
});
assertBusy(() -> assertTrue(future.isDone()));
try {
future.get();
} catch (ExecutionException e) {
// ignored
}
configureAndConnectsToRemoteClusters();
} finally {
SearchListenerPlugin.allowQueryPhase();
@ -298,20 +302,21 @@ public class CrossClusterIT extends AbstractMultiClustersTestCase {
}
SearchListenerPlugin.allowQueryPhase();
assertBusy(() -> assertTrue(queryFuture.isDone()));
assertBusy(() -> assertTrue(cancelFuture.isDone()));
try {
queryFuture.get();
fail("query should have failed");
} catch (ExecutionException e) {
assertNotNull(e.getCause());
Throwable t = ExceptionsHelper.unwrap(e, TaskCancelledException.class);
assertNotNull(t);
}
cancelFuture.get();
assertBusy(() -> {
final Iterable<TransportService> transportServices = cluster("cluster_a").getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
assertThat(transportService.getTaskManager().getBannedTaskIds(), Matchers.empty());
}
});
ExecutionException e = expectThrows(ExecutionException.class, () -> queryFuture.result());
assertNotNull(e);
assertNotNull(e.getCause());
Throwable t = ExceptionsHelper.unwrap(e, TaskCancelledException.class);
assertNotNull(t);
}
/**

View file

@ -379,11 +379,9 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
r.incRef();
l.onResponse(r);
}));
assertBusy(() -> assertTrue(queryFuture.isDone()));
// dfs=true overrides the minimize_roundtrips=true setting and does not minimize roundtrips
if (skipUnavailable == false && minimizeRoundtrips && dfs == false) {
ExecutionException ee = expectThrows(ExecutionException.class, () -> queryFuture.get());
ExecutionException ee = expectThrows(ExecutionException.class, queryFuture::get);
assertNotNull(ee.getCause());
assertThat(ee.getCause(), instanceOf(RemoteTransportException.class));
Throwable rootCause = ExceptionsHelper.unwrap(ee.getCause(), IllegalStateException.class);
@ -622,10 +620,8 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
r.incRef();
l.onResponse(r);
}));
assertBusy(() -> assertTrue(queryFuture.isDone()));
if (skipUnavailable == false || minimizeRoundtrips == false) {
ExecutionException ee = expectThrows(ExecutionException.class, () -> queryFuture.get());
ExecutionException ee = expectThrows(ExecutionException.class, queryFuture::get);
assertNotNull(ee.getCause());
Throwable rootCause = ExceptionsHelper.unwrap(ee, IllegalStateException.class);
assertThat(rootCause.getMessage(), containsString("index corrupted"));

View file

@ -15,6 +15,7 @@ import org.apache.logging.log4j.Level;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
@ -380,8 +381,10 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
}
public void testWithIndexFilter() throws InterruptedException {
assertAcked(prepareCreate("index-1").setMapping("timestamp", "type=date", "field1", "type=keyword"));
assertAcked(prepareCreate("index-2").setMapping("timestamp", "type=date", "field1", "type=long"));
assertAcked(
prepareCreate("index-1").setMapping("timestamp", "type=date", "field1", "type=keyword"),
prepareCreate("index-2").setMapping("timestamp", "type=date", "field1", "type=long")
);
List<IndexRequestBuilder> reqs = new ArrayList<>();
reqs.add(prepareIndex("index-1").setSource("timestamp", "2015-07-08"));
@ -474,8 +477,7 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
public void testFailures() throws InterruptedException {
// in addition to the existing "old_index" and "new_index", create two where the test query throws an error on rewrite
assertAcked(prepareCreate("index1-error"));
assertAcked(prepareCreate("index2-error"));
assertAcked(prepareCreate("index1-error"), prepareCreate("index2-error"));
ensureGreen("index1-error", "index2-error");
FieldCapabilitiesResponse response = client().prepareFieldCaps()
.setFields("*")
@ -503,9 +505,7 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(
prepareCreate("log-index-1").setSettings(indexSettings(between(1, 5), 1))
.setMapping("timestamp", "type=date", "field1", "type=keyword")
);
assertAcked(
.setMapping("timestamp", "type=date", "field1", "type=keyword"),
prepareCreate("log-index-2").setSettings(indexSettings(between(1, 5), 1))
.setMapping("timestamp", "type=date", "field1", "type=long")
);
@ -666,9 +666,11 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
}
""";
String[] indices = IntStream.range(0, between(1, 9)).mapToObj(n -> "test_many_index_" + n).toArray(String[]::new);
for (String index : indices) {
assertAcked(indicesAdmin().prepareCreate(index).setMapping(mapping).get());
}
assertAcked(
Arrays.stream(indices)
.map(index -> indicesAdmin().prepareCreate(index).setMapping(mapping))
.toArray(CreateIndexRequestBuilder[]::new)
);
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("test_many_index_*");
request.fields("*");
@ -787,9 +789,11 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("hostname")).build();
int numIndices = between(1, 5);
for (int i = 0; i < numIndices; i++) {
assertAcked(indicesAdmin().prepareCreate("test_metrics_" + i).setSettings(settings).setMapping(metricsMapping).get());
assertAcked(
indicesAdmin().prepareCreate("test_metrics_" + i).setSettings(settings).setMapping(metricsMapping),
indicesAdmin().prepareCreate("test_old_metrics_" + i).setMapping(metricsMapping)
);
indexModes.put("test_metrics_" + i, IndexMode.TIME_SERIES);
assertAcked(indicesAdmin().prepareCreate("test_old_metrics_" + i).setMapping(metricsMapping).get());
indexModes.put("test_old_metrics_" + i, IndexMode.STANDARD);
}
}
@ -808,9 +812,11 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
Settings settings = Settings.builder().put("mode", "logsdb").build();
int numIndices = between(1, 5);
for (int i = 0; i < numIndices; i++) {
assertAcked(indicesAdmin().prepareCreate("test_logs_" + i).setSettings(settings).setMapping(logsMapping).get());
assertAcked(
indicesAdmin().prepareCreate("test_logs_" + i).setSettings(settings).setMapping(logsMapping),
indicesAdmin().prepareCreate("test_old_logs_" + i).setMapping(logsMapping)
);
indexModes.put("test_logs_" + i, IndexMode.LOGSDB);
assertAcked(indicesAdmin().prepareCreate("test_old_logs_" + i).setMapping(logsMapping).get());
indexModes.put("test_old_logs_" + i, IndexMode.STANDARD);
}
}

View file

@ -2003,10 +2003,12 @@ public class FieldSortIT extends ESIntegTestCase {
}
public void testSortMixedFieldTypes() {
assertAcked(prepareCreate("index_long").setMapping("foo", "type=long").get());
assertAcked(prepareCreate("index_integer").setMapping("foo", "type=integer").get());
assertAcked(prepareCreate("index_double").setMapping("foo", "type=double").get());
assertAcked(prepareCreate("index_keyword").setMapping("foo", "type=keyword").get());
assertAcked(
prepareCreate("index_long").setMapping("foo", "type=long"),
prepareCreate("index_integer").setMapping("foo", "type=integer"),
prepareCreate("index_double").setMapping("foo", "type=double"),
prepareCreate("index_keyword").setMapping("foo", "type=keyword")
);
prepareIndex("index_long").setId("1").setSource("foo", "123").get();
prepareIndex("index_integer").setId("1").setSource("foo", "123").get();
@ -2038,9 +2040,11 @@ public class FieldSortIT extends ESIntegTestCase {
}
public void testSortMixedFieldTypesWithNoDocsForOneType() {
assertAcked(prepareCreate("index_long").setMapping("foo", "type=long").get());
assertAcked(prepareCreate("index_other").setMapping("bar", "type=keyword").get());
assertAcked(prepareCreate("index_double").setMapping("foo", "type=double").get());
assertAcked(
prepareCreate("index_long").setMapping("foo", "type=long"),
prepareCreate("index_other").setMapping("bar", "type=keyword"),
prepareCreate("index_double").setMapping("foo", "type=double")
);
prepareIndex("index_long").setId("1").setSource("foo", "123").get();
prepareIndex("index_long").setId("2").setSource("foo", "124").get();

View file

@ -565,9 +565,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
assertSuccessful(sn1);
assertSuccessful(sn2);
assertSuccessful(sn3);
assertAcked(clone1.get());
assertAcked(clone2.get());
assertAcked(clone3.get());
assertAcked(clone1, clone2, clone3);
}
public void testStartSnapshotWithSuccessfulShardClonePendingFinalization() throws Exception {
@ -624,8 +622,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
unblockNode(repoName, masterName);
awaitNoMoreRunningOperations(masterName);
awaitMasterFinishRepoOperations();
assertAcked(blockedClone.get());
assertAcked(otherClone.get());
assertAcked(blockedClone, otherClone);
assertEquals(getSnapshot(repoName, cloneName).state(), SnapshotState.SUCCESS);
assertEquals(getSnapshot(repoName, otherCloneName).state(), SnapshotState.SUCCESS);
}
@ -732,8 +729,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
awaitClusterState(state -> SnapshotsInProgress.get(state).forRepo(repoName).stream().anyMatch(entry -> entry.state().completed()));
repo.unblock();
assertAcked(clone1.get());
assertAcked(clone2.get());
assertAcked(clone1, clone2);
}
public void testRemoveFailedCloneFromCSWithoutIO() throws Exception {

View file

@ -478,8 +478,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertThat(thirdSnapshotResponse.get().getSnapshotInfo().state(), is(SnapshotState.FAILED));
logger.info("--> verify both deletes have completed");
assertAcked(deleteSnapshotsResponse.get());
assertAcked(allDeletedResponse.get());
assertAcked(deleteSnapshotsResponse, allDeletedResponse);
logger.info("--> verify that all snapshots are gone");
assertThat(clusterAdmin().prepareGetSnapshots(TEST_REQUEST_TIMEOUT, repoName).get().getSnapshots(), empty());
@ -715,8 +714,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
networkDisruption.stopDisrupting();
logger.info("--> make sure all failing requests get a response");
assertAcked(firstDeleteFuture.get());
assertAcked(secondDeleteFuture.get());
assertAcked(firstDeleteFuture, secondDeleteFuture);
expectThrows(SnapshotException.class, createSnapshot);
awaitNoMoreRunningOperations();
}
@ -1014,8 +1012,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
awaitNDeletionsInProgress(2);
unblockNode(repoName, masterName);
assertAcked(deleteSnapshotOne.get());
assertAcked(deleteSnapshotTwo.get());
assertAcked(deleteSnapshotOne, deleteSnapshotTwo);
final RepositoryData repositoryData = getRepositoryData(repoName);
assertThat(repositoryData.getSnapshotIds(), empty());
@ -1361,9 +1358,12 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
if (deleteAndAbortAll) {
awaitNumberOfSnapshotsInProgress(0);
for (ActionFuture<CreateSnapshotResponse> snapshotFuture : snapshotFutures) {
// just check that the futures resolve, whether or not things worked out with the snapshot actually finalizing or failing
// due to the abort does not matter
assertBusy(() -> assertTrue(snapshotFuture.isDone()));
try {
snapshotFuture.get();
} catch (ExecutionException e) {
// just check that the futures resolve, whether or not things worked out with the snapshot actually finalizing or
// failing due to the abort does not matter
}
}
assertThat(getRepositoryData(repoName).getSnapshotIds(), empty());
} else {
@ -1890,8 +1890,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertSuccessful(snapshot3);
unblockNode(repository, master);
assertAcked(cloneSnapshot.get());
assertAcked(cloneSnapshot2.get());
assertAcked(cloneSnapshot, cloneSnapshot2);
assertAcked(startDeleteSnapshot(repository, cloneTarget).get());
assertThat(
@ -2031,8 +2030,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
awaitNumberOfSnapshotsInProgress(2);
unblockNode(repository, master);
assertAcked(deleteFuture.get());
assertAcked(cloneFuture.get());
assertAcked(deleteFuture, cloneFuture);
awaitNoMoreRunningOperations();
assertThat(snapshot1.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
}
@ -2109,8 +2107,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
awaitNumberOfSnapshotsInProgress(3);
unblockNode(repository, master);
assertAcked(deleteFuture.get());
assertAcked(cloneFuture.get());
assertAcked(deleteFuture, cloneFuture);
awaitNoMoreRunningOperations();
assertThat(snapshot1.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
assertThat(snapshot2.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));

View file

@ -664,8 +664,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
createSnapshot(repo, snapshot, Collections.singletonList(shrunkIdx));
logger.info("--> delete index and stop the data node");
assertAcked(indicesAdmin().prepareDelete(sourceIdx).get());
assertAcked(indicesAdmin().prepareDelete(shrunkIdx).get());
assertAcked(indicesAdmin().prepareDelete(sourceIdx), indicesAdmin().prepareDelete(shrunkIdx));
internalCluster().stopRandomDataNode();
clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT).setTimeout(TimeValue.timeValueSeconds(30)).setWaitForNodes("1");

View file

@ -145,6 +145,7 @@ public class TransportVersions {
public static final TransportVersion EQL_ALLOW_PARTIAL_SEARCH_RESULTS = def(8_809_00_0);
public static final TransportVersion NODE_VERSION_INFORMATION_WITH_MIN_READ_ONLY_INDEX_VERSION = def(8_810_00_0);
public static final TransportVersion ERROR_TRACE_IN_TRANSPORT_HEADER = def(8_811_00_0);
public static final TransportVersion FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING = def(8_812_00_0);
/*
* WARNING: DO NOT MERGE INTO MAIN!

View file

@ -480,13 +480,19 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
return this;
}
/**
* Returns whether the failure store should be initialized. N.B. If true, failure store index creation will be performed regardless of
* whether the template indicates that the failure store is enabled.
*/
public boolean isInitializeFailureStore() {
return initializeFailureStore;
}
/**
* Set whether this CreateIndexRequest should initialize the failure store on data stream creation. This can be necessary when, for
* example, a failure occurs while trying to ingest a document into a data stream that has to be auto-created.
* example, a failure occurs while trying to ingest a document into a data stream that has to be auto-created. N.B. If true, failure
* store index creation will be performed regardless of whether the template indicates that the failure store is enabled. It is the
* caller's responsibility to ensure that this is correct.
*/
public CreateIndexRequest initializeFailureStore(boolean initializeFailureStore) {
this.initializeFailureStore = initializeFailureStore;

View file

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -101,6 +102,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
private final Set<Integer> failedRolloverRequests = ConcurrentCollections.newConcurrentSet();
private final Map<ShardId, Exception> shortCircuitShardFailures = ConcurrentCollections.newConcurrentMap();
private final FailureStoreMetrics failureStoreMetrics;
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
BulkOperation(
Task task,
@ -115,7 +117,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
LongSupplier relativeTimeProvider,
long startTimeNanos,
ActionListener<BulkResponse> listener,
FailureStoreMetrics failureStoreMetrics
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
) {
this(
task,
@ -132,7 +135,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
listener,
new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()),
new FailureStoreDocumentConverter(),
failureStoreMetrics
failureStoreMetrics,
dataStreamFailureStoreSettings
);
}
@ -151,7 +155,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
ActionListener<BulkResponse> listener,
ClusterStateObserver observer,
FailureStoreDocumentConverter failureStoreDocumentConverter,
FailureStoreMetrics failureStoreMetrics
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
) {
super(listener);
this.task = task;
@ -171,6 +176,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN);
this.shortCircuitShardFailures.putAll(bulkRequest.incrementalState().shardLevelFailures());
this.failureStoreMetrics = failureStoreMetrics;
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
}
@Override
@ -556,7 +562,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
// Do not redirect documents to a failure store that were already headed to one.
var isFailureStoreRequest = isFailureStoreRequest(docWriteRequest);
if (isFailureStoreRequest == false
&& failureStoreCandidate.isFailureStoreEnabled()
&& failureStoreCandidate.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings)
&& error instanceof VersionConflictEngineException == false
&& error instanceof EsRejectedExecutionException == false) {
// Prepare the data stream failure store if necessary
@ -589,7 +595,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
if (isFailureStoreRequest) {
return IndexDocFailureStoreStatus.FAILED;
}
if (failureStoreCandidate.isFailureStoreEnabled() == false) {
if (failureStoreCandidate.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings) == false) {
return IndexDocFailureStoreStatus.NOT_ENABLED;
}
}

View file

@ -356,9 +356,11 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
* @param indexName The index name to check.
* @param metadata Cluster state metadata.
* @param epochMillis A timestamp to use when resolving date math in the index name.
* @return true if this is not a simulation, and the given index name corresponds to a data stream with a failure store
* or if it matches a template that has a data stream failure store enabled. Returns false if the index name corresponds to a
* data stream, but it doesn't have the failure store enabled. Returns null when it doesn't correspond to a data stream.
* @return true if this is not a simulation, and the given index name corresponds to a data stream with a failure store, or if it
* matches a template that has a data stream failure store enabled, or if it matches a data stream template with no failure store
* option specified and the name matches the cluster setting to enable the failure store. Returns false if the index name
* corresponds to a data stream, but it doesn't have the failure store enabled by one of those conditions. Returns null when it
* doesn't correspond to a data stream.
*/
protected abstract Boolean resolveFailureStore(String indexName, ProjectMetadata metadata, long epochMillis);

View file

@ -17,7 +17,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -35,6 +34,7 @@ import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -86,6 +86,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final OriginSettingClient rolloverClient;
private final FailureStoreMetrics failureStoreMetrics;
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
@Inject
public TransportBulkAction(
@ -100,7 +101,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
IndexingPressure indexingPressure,
SystemIndices systemIndices,
ProjectResolver projectResolver,
FailureStoreMetrics failureStoreMetrics
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
) {
this(
threadPool,
@ -115,7 +117,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
systemIndices,
projectResolver,
threadPool::relativeTimeInNanos,
failureStoreMetrics
failureStoreMetrics,
dataStreamFailureStoreSettings
);
}
@ -132,7 +135,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
SystemIndices systemIndices,
ProjectResolver projectResolver,
LongSupplier relativeTimeProvider,
FailureStoreMetrics failureStoreMetrics
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
) {
this(
TYPE,
@ -149,7 +153,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
systemIndices,
projectResolver,
relativeTimeProvider,
failureStoreMetrics
failureStoreMetrics,
dataStreamFailureStoreSettings
);
}
@ -168,7 +173,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
SystemIndices systemIndices,
ProjectResolver projectResolver,
LongSupplier relativeTimeProvider,
FailureStoreMetrics failureStoreMetrics
FailureStoreMetrics failureStoreMetrics,
DataStreamFailureStoreSettings dataStreamFailureStoreSettings
) {
super(
bulkAction,
@ -183,6 +189,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
projectResolver,
relativeTimeProvider
);
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
Objects.requireNonNull(relativeTimeProvider);
this.featureService = featureService;
this.client = client;
@ -310,7 +317,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
for (DocWriteRequest<?> request : bulkRequest.requests) {
// Delete requests should not attempt to create the index (if the index does not exist), unless an external versioning is used.
if (request.opType() == OpType.DELETE
if (request.opType() == DocWriteRequest.OpType.DELETE
&& request.versionType() != VersionType.EXTERNAL
&& request.versionType() != VersionType.EXTERNAL_GTE) {
continue;
@ -520,7 +527,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
static void prohibitAppendWritesInBackingIndices(DocWriteRequest<?> writeRequest, IndexAbstraction indexAbstraction) {
DocWriteRequest.OpType opType = writeRequest.opType();
if ((opType == OpType.CREATE || opType == OpType.INDEX) == false) {
if ((opType == DocWriteRequest.OpType.CREATE || opType == DocWriteRequest.OpType.INDEX) == false) {
// op type not create or index, then bail early
return;
}
@ -617,7 +624,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
relativeTimeNanosProvider,
startTimeNanos,
listener,
failureStoreMetrics
failureStoreMetrics,
dataStreamFailureStoreSettings
).run();
}
@ -625,7 +633,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
* See {@link #resolveFailureStore(String, ProjectMetadata, long)}
*/
// Visibility for testing
static Boolean resolveFailureInternal(String indexName, ProjectMetadata projectMetadata, long epochMillis) {
Boolean resolveFailureInternal(String indexName, ProjectMetadata projectMetadata, long epochMillis) {
if (DataStream.isFailureStoreFeatureFlagEnabled() == false) {
return null;
}
@ -633,7 +641,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
if (resolution != null) {
return resolution;
}
return resolveFailureStoreFromTemplate(indexName, projectMetadata);
return resolveFailureStoreFromTemplate(indexName, projectMetadata, epochMillis);
}
@Override
@ -648,7 +656,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
* @param epochMillis A timestamp to use when resolving date math in the index name.
* @return true if the given index name corresponds to an existing data stream with a failure store enabled.
*/
private static Boolean resolveFailureStoreFromMetadata(String indexName, ProjectMetadata projectMetadata, long epochMillis) {
private Boolean resolveFailureStoreFromMetadata(String indexName, ProjectMetadata projectMetadata, long epochMillis) {
if (indexName == null) {
return null;
}
@ -665,7 +673,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
DataStream targetDataStream = DataStream.resolveDataStream(indexAbstraction, projectMetadata);
// We will store the failure if the write target belongs to a data stream with a failure store.
return targetDataStream != null && targetDataStream.isFailureStoreEnabled();
return targetDataStream != null && targetDataStream.isFailureStoreEffectivelyEnabled(dataStreamFailureStoreSettings);
}
/**
@ -673,18 +681,20 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
* a data stream feature, the method returns true/false only if it is a data stream template, otherwise null.
* @param indexName The index name to check.
* @param projectMetadata ProjectMetadata from the cluster state.
* @return true the associated index template has failure store enabled, false if the failure store is disabled or it's not specified,
* and null if the template is not a data stream template.
* Visible for testing
* @param epochMillis A timestamp to use when resolving date math in the index name.
* @return true the associated index template has failure store enabled, false if the failure store is disabled, true or false according
* to the cluster setting if there is a data stream template with no failure store option specified, and null if no template is
* found or if the template is not a data stream template.
*/
@Nullable
static Boolean resolveFailureStoreFromTemplate(String indexName, ProjectMetadata projectMetadata) {
private Boolean resolveFailureStoreFromTemplate(String indexName, ProjectMetadata projectMetadata, long epochMillis) {
if (indexName == null) {
return null;
}
// Check to see if the index name matches any templates such that an index would have been attributed
// We don't check v1 templates at all because failure stores can only exist on data streams via a v2 template
// N.B. This currently does date math resolution itself and does *not* use epochMillis (it gets the system time again)
String template = MetadataIndexTemplateService.findV2Template(projectMetadata, indexName, false);
if (template != null) {
// Check if this is a data stream template or if it is just a normal index.
@ -695,7 +705,12 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
composableIndexTemplate,
projectMetadata.componentTemplates()
).mapAndGet(DataStreamOptions.Template::toDataStreamOptions);
return dataStreamOptions != null && dataStreamOptions.isFailureStoreEnabled();
return DataStream.isFailureStoreEffectivelyEnabled(
dataStreamOptions,
dataStreamFailureStoreSettings,
IndexNameExpressionResolver.resolveDateMathExpression(indexName, epochMillis),
systemIndices
);
}
}

View file

@ -234,6 +234,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
private final DataStream dataStream;
private final ClusterHealthStatus dataStreamStatus;
private final boolean failureStoreEffectivelyEnabled; // Must be serialized independently of dataStream as depends on settings
@Nullable
private final String indexTemplate;
@Nullable
@ -247,6 +248,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
public DataStreamInfo(
DataStream dataStream,
boolean failureStoreEffectivelyEnabled,
ClusterHealthStatus dataStreamStatus,
@Nullable String indexTemplate,
@Nullable String ilmPolicyName,
@ -256,6 +258,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
@Nullable Long maximumTimestamp
) {
this.dataStream = dataStream;
this.failureStoreEffectivelyEnabled = failureStoreEffectivelyEnabled;
this.dataStreamStatus = dataStreamStatus;
this.indexTemplate = indexTemplate;
this.ilmPolicyName = ilmPolicyName;
@ -267,22 +270,32 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
@SuppressWarnings("unchecked")
DataStreamInfo(StreamInput in) throws IOException {
this(
DataStream.read(in),
ClusterHealthStatus.readFrom(in),
in.readOptionalString(),
in.readOptionalString(),
in.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0) ? in.readOptionalWriteable(TimeSeries::new) : null,
in.getTransportVersion().onOrAfter(V_8_11_X) ? in.readMap(Index::new, IndexProperties::new) : Map.of(),
in.getTransportVersion().onOrAfter(V_8_11_X) ? in.readBoolean() : true,
in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readOptionalVLong() : null
);
this.dataStream = DataStream.read(in);
this.failureStoreEffectivelyEnabled = in.getTransportVersion()
.onOrAfter(TransportVersions.FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING)
? in.readBoolean()
: dataStream.isFailureStoreExplicitlyEnabled(); // Revert to the behaviour before this field was added
this.dataStreamStatus = ClusterHealthStatus.readFrom(in);
this.indexTemplate = in.readOptionalString();
this.ilmPolicyName = in.readOptionalString();
this.timeSeries = in.getTransportVersion().onOrAfter(TransportVersions.V_8_3_0)
? in.readOptionalWriteable(TimeSeries::new)
: null;
this.indexSettingsValues = in.getTransportVersion().onOrAfter(V_8_11_X)
? in.readMap(Index::new, IndexProperties::new)
: Map.of();
this.templatePreferIlmValue = in.getTransportVersion().onOrAfter(V_8_11_X) ? in.readBoolean() : true;
this.maximumTimestamp = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readOptionalVLong() : null;
}
public DataStream getDataStream() {
return dataStream;
}
public boolean isFailureStoreEffectivelyEnabled() {
return failureStoreEffectivelyEnabled;
}
public ClusterHealthStatus getDataStreamStatus() {
return dataStreamStatus;
}
@ -318,6 +331,9 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
@Override
public void writeTo(StreamOutput out) throws IOException {
dataStream.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING)) {
out.writeBoolean(failureStoreEffectivelyEnabled);
}
dataStreamStatus.writeTo(out);
out.writeOptionalString(indexTemplate);
out.writeOptionalString(ilmPolicyName);
@ -398,7 +414,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
}
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
builder.startObject(DataStream.FAILURE_STORE_FIELD.getPreferredName());
builder.field(FAILURE_STORE_ENABLED.getPreferredName(), dataStream.isFailureStoreEnabled());
builder.field(FAILURE_STORE_ENABLED.getPreferredName(), failureStoreEffectivelyEnabled);
builder.field(
DataStream.ROLLOVER_ON_WRITE_FIELD.getPreferredName(),
dataStream.getFailureIndices().isRolloverOnWrite()
@ -477,6 +493,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
DataStreamInfo that = (DataStreamInfo) o;
return templatePreferIlmValue == that.templatePreferIlmValue
&& Objects.equals(dataStream, that.dataStream)
&& failureStoreEffectivelyEnabled == that.failureStoreEffectivelyEnabled
&& dataStreamStatus == that.dataStreamStatus
&& Objects.equals(indexTemplate, that.indexTemplate)
&& Objects.equals(ilmPolicyName, that.ilmPolicyName)
@ -490,6 +507,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
return Objects.hash(
dataStream,
dataStreamStatus,
failureStoreEffectivelyEnabled,
indexTemplate,
ilmPolicyName,
timeSeries,

View file

@ -23,7 +23,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.VersionType;
@ -52,7 +51,6 @@ public class MultiGetRequest extends ActionRequest
CompositeIndicesRequest,
RealtimeRequest,
ToXContentObject {
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(MultiGetRequest.class);
private static final ParseField DOCS = new ParseField("docs");
private static final ParseField INDEX = new ParseField("_index");

View file

@ -51,6 +51,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.function.Supplier;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -78,7 +79,6 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private static final TransportVersion PIPELINES_HAVE_RUN_FIELD_ADDED = TransportVersions.V_8_10_X;
private static final Supplier<String> ID_GENERATOR = UUIDs::base64UUID;
private static final Supplier<String> K_SORTED_TIME_BASED_ID_GENERATOR = UUIDs::base64TimeBasedKOrderedUUID;
/**
* Max length of the source document to include into string()
@ -705,9 +705,18 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
}
public void autoGenerateTimeBasedId() {
autoGenerateTimeBasedId(OptionalInt.empty());
}
/**
* Set the {@code #id()} to an automatically generated one, optimized for storage (compression) efficiency.
* If a routing hash is passed, it is included in the generated id starting at 9 bytes before the end.
* @param hash optional routing hash value, used to route requests by id to the right shard.
*/
public void autoGenerateTimeBasedId(OptionalInt hash) {
assertBeforeGeneratingId();
autoGenerateTimestamp();
id(K_SORTED_TIME_BASED_ID_GENERATOR.get());
id(UUIDs.base64TimeBasedKOrderedUUIDWithHash(hash));
}
private void autoGenerateTimestamp() {
@ -899,12 +908,18 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
}
/**
* Transient flag denoting that the local request should be routed to a failure store. Not persisted across the wire.
* Returns a transient flag denoting that the local request should be routed to a failure store. Not persisted across the wire. N.B. If
* true, the failure store will be used regardless of whether the metadata indicates that the failure store is enabled.
*/
public boolean isWriteToFailureStore() {
return writeToFailureStore;
}
/**
* Sets a transient flag denoting that the local request should be routed to a failure store. Not persisted across the wire. N.B. If
* true, the failure store will be used regardless of whether the metadata indicates that the failure store is enabled. It is the
* caller's responsibility to ensure that this is correct.
*/
public IndexRequest setWriteToFailureStore(boolean writeToFailureStore) {
this.writeToFailureStore = writeToFailureStore;
return this;

Some files were not shown because too many files have changed in this diff Show more