Add point in time to HLRC (#72167)

Closes #70593
This commit is contained in:
Nhat Nguyen 2021-05-12 17:59:25 -04:00 committed by GitHub
parent ce41fd7e2a
commit 44fc661835
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 489 additions and 98 deletions

View file

@ -30,7 +30,9 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
@ -401,7 +403,9 @@ final class RequestConverters {
params.withPreference(searchRequest.preference()); params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions()); params.withIndicesOptions(searchRequest.indicesOptions());
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT)); params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); if (searchRequest.isCcsMinimizeRoundtrips() != SearchRequest.defaultCcsMinimizeRoundtrips(searchRequest)) {
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}
if (searchRequest.getPreFilterShardSize() != null) { if (searchRequest.getPreFilterShardSize() != null) {
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize())); params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
} }
@ -430,6 +434,23 @@ final class RequestConverters {
return request; return request;
} }
static Request openPointInTime(OpenPointInTimeRequest openRequest) {
Request request = new Request(HttpPost.METHOD_NAME, endpoint(openRequest.indices(), "_pit"));
Params params = new Params();
params.withIndicesOptions(openRequest.indicesOptions());
params.withRouting(openRequest.routing());
params.withPreference(openRequest.preference());
params.putParam("keep_alive", openRequest.keepAlive());
request.addParameters(params.asMap());
return request;
}
static Request closePointInTime(ClosePointInTimeRequest closeRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_pit");
request.setEntity(createEntity(closeRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException { static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch"); Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

View file

@ -35,8 +35,12 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.ClosePointInTimeResponse;
import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchScrollRequest;
@ -1283,6 +1287,66 @@ public class RestHighLevelClient implements Closeable {
options, ClearScrollResponse::fromXContent, listener, emptySet()); options, ClearScrollResponse::fromXContent, listener, emptySet());
} }
/**
* Open a point in time before using it in search requests.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html"> Point in time API </a>
* @param openRequest the open request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response containing the point in time id
*/
public final OpenPointInTimeResponse openPointInTime(OpenPointInTimeRequest openRequest,
RequestOptions options) throws IOException {
return performRequestAndParseEntity(openRequest, RequestConverters::openPointInTime,
options, OpenPointInTimeResponse::fromXContent, emptySet());
}
/**
* Asynchronously open a point in time before using it in search requests
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html"> Point in time API </a>
* @param openRequest the open request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return a cancellable that may be used to cancel the request
*/
public final Cancellable openPointInTimeAsync(OpenPointInTimeRequest openRequest,
RequestOptions options,
ActionListener<OpenPointInTimeResponse> listener) {
return performRequestAsyncAndParseEntity(openRequest, RequestConverters::openPointInTime,
options, OpenPointInTimeResponse::fromXContent, listener, emptySet());
}
/**
* Close a point in time that is opened with {@link #openPointInTime(OpenPointInTimeRequest, RequestOptions)} or
* {@link #openPointInTimeAsync(OpenPointInTimeRequest, RequestOptions, ActionListener)}.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html#close-point-in-time-api">
* Close point in time API</a>
* @param closeRequest the close request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final ClosePointInTimeResponse closePointInTime(ClosePointInTimeRequest closeRequest,
RequestOptions options) throws IOException {
return performRequestAndParseEntity(closeRequest, RequestConverters::closePointInTime, options,
ClosePointInTimeResponse::fromXContent, emptySet());
}
/**
* Asynchronously close a point in time that is opened with {@link #openPointInTime(OpenPointInTimeRequest, RequestOptions)} or
* {@link #openPointInTimeAsync(OpenPointInTimeRequest, RequestOptions, ActionListener)}.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html#close-point-in-time-api">
* Close point in time API</a>
* @param closeRequest the close request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return a cancellable that may be used to cancel the request
*/
public final Cancellable closePointInTimeAsync(ClosePointInTimeRequest closeRequest,
RequestOptions options,
ActionListener<ClosePointInTimeResponse> listener) {
return performRequestAsyncAndParseEntity(closeRequest, RequestConverters::closePointInTime,
options, ClosePointInTimeResponse::fromXContent, listener, emptySet());
}
/** /**
* Executes a request using the Search Template API. * Executes a request using the Search Template API.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-template.html">Search Template API * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-template.html">Search Template API

View file

@ -29,7 +29,9 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
@ -83,6 +85,7 @@ import org.elasticsearch.script.mustache.SearchTemplateRequest;
import org.elasticsearch.search.Scroll; import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
@ -1028,7 +1031,12 @@ public class RequestConvertersTests extends ESTestCase {
String[] indices = randomIndicesNames(0, 5); String[] indices = randomIndicesNames(0, 5);
Map<String, String> expectedParams = new HashMap<>(); Map<String, String> expectedParams = new HashMap<>();
SearchRequest searchRequest = createTestSearchRequest(indices, expectedParams); SearchRequest searchRequest = createTestSearchRequest(indices, expectedParams);
if (searchRequest.source() != null && randomBoolean()) {
PointInTimeBuilder pit = new PointInTimeBuilder(randomAlphaOfLength(100));
if (randomBoolean()) {
pit.setKeepAlive(TimeValue.timeValueMinutes(between(1, 10)));
}
}
Request request = RequestConverters.search(searchRequest, searchEndpoint); Request request = RequestConverters.search(searchRequest, searchEndpoint);
StringJoiner endpoint = new StringJoiner("/", "/", ""); StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices); String index = String.join(",", indices);
@ -1291,7 +1299,7 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(HttpPost.METHOD_NAME, request.getMethod()); assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint()); assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters()); assertThat(request.getParameters(), equalTo(expectedParams));
assertToXContentBody(searchTemplateRequest, request.getEntity()); assertToXContentBody(searchTemplateRequest, request.getEntity());
} }
@ -1410,6 +1418,55 @@ public class RequestConvertersTests extends ESTestCase {
assertToXContentBody(explainRequest, request.getEntity()); assertToXContentBody(explainRequest, request.getEntity());
} }
public void testPointInTime() throws Exception {
// Open point in time
{
Map<String, String> expectedParams = new HashMap<>();
String[] indices = randomIndicesNames(1, 5);
OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indices);
String keepAlive = randomFrom("1ms", "2m", "1d");
openRequest.keepAlive(TimeValue.parseTimeValue(keepAlive, "keep_alive"));
expectedParams.put("keep_alive", keepAlive);
if (randomBoolean()) {
String routing = randomAlphaOfLengthBetween(1, 10);
openRequest.routing(routing);
expectedParams.put("routing", routing);
}
if (randomBoolean()) {
String preference = randomAlphaOfLengthBetween(1, 10);
openRequest.preference(preference);
expectedParams.put("preference", preference);
}
openRequest.indicesOptions(setRandomIndicesOptions(openRequest.indicesOptions(), expectedParams));
final Request request = RequestConverters.openPointInTime(openRequest);
assertThat(request.getParameters(), equalTo(expectedParams));
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
final String expectedEndpoint = "/" + String.join(",", indices) + "/_pit";
assertThat(request.getEndpoint(), equalTo(expectedEndpoint));
}
// Search with point in time
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder());
String pitID = randomAlphaOfLength(10);
final PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pitID);
if (randomBoolean()) {
pointInTimeBuilder.setKeepAlive(randomFrom(TimeValue.timeValueSeconds(1), TimeValue.timeValueMillis(10)));
}
searchRequest.source().pointInTimeBuilder(pointInTimeBuilder);
final Request request = RequestConverters.search(searchRequest, "/_search");
assertToXContentBody(searchRequest.source(), request.getEntity());
}
// close PIT
{
String id = randomAlphaOfLengthBetween(3, 10);
Request request = RequestConverters.closePointInTime(new ClosePointInTimeRequest(id));
assertThat(request.getMethod(), equalTo(HttpDelete.METHOD_NAME));
assertThat(request.getEndpoint(), equalTo("/_pit"));
assertThat(EntityUtils.toString(request.getEntity()), equalTo("{\"id\":" + "\"" + id + "\"}"));
}
}
public void testTermVectors() throws IOException { public void testTermVectors() throws IOException {
String index = randomAlphaOfLengthBetween(3, 10); String index = randomAlphaOfLengthBetween(3, 10);
String id = randomAlphaOfLengthBetween(3, 10); String id = randomAlphaOfLengthBetween(3, 10);
@ -1915,9 +1972,12 @@ public class RequestConvertersTests extends ESTestCase {
expectedParams.put("scroll", searchRequest.scroll().keepAlive().getStringRep()); expectedParams.put("scroll", searchRequest.scroll().keepAlive().getStringRep());
} }
if (randomBoolean()) { if (randomBoolean()) {
searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); boolean ccsMinimizeRoundtrips = randomBoolean();
searchRequest.setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips);
if (ccsMinimizeRoundtrips == false) {
expectedParams.put("ccs_minimize_roundtrips", "false");
}
} }
expectedParams.put("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
if (randomBoolean()) { if (randomBoolean()) {
searchRequest.setMaxConcurrentShardRequests(randomIntBetween(1, Integer.MAX_VALUE)); searchRequest.setMaxConcurrentShardRequests(randomIntBetween(1, Integer.MAX_VALUE));
} }

View file

@ -12,15 +12,20 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.explain.ExplainRequest; import org.elasticsearch.action.explain.ExplainRequest;
import org.elasticsearch.action.explain.ExplainResponse; import org.elasticsearch.action.explain.ExplainResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilities; import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.ClosePointInTimeResponse;
import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchScrollRequest;
@ -66,6 +71,7 @@ import org.elasticsearch.search.aggregations.metrics.WeightedAvg;
import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig; import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
@ -1370,6 +1376,43 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
assertSecondHit(searchResponse, hasId("1")); assertSecondHit(searchResponse, hasId("1"));
} }
public void testPointInTime() throws Exception {
int numDocs = between(50, 100);
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest("test-index").id(Integer.toString(i)).source("field", i);
highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
}
highLevelClient().indices().refresh(new RefreshRequest("test-index"), RequestOptions.DEFAULT);
OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest("test-index").keepAlive(TimeValue.timeValueMinutes(between(1, 5)));
String pitID = execute(openRequest, highLevelClient()::openPointInTime, highLevelClient()::openPointInTimeAsync).getPointInTimeId();
try {
int totalHits = 0;
SearchResponse searchResponse = null;
do {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(between(5, 10)).sort("field");
PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pitID);
if (randomBoolean()) {
pointInTimeBuilder.setKeepAlive(TimeValue.timeValueMinutes(between(1, 5)));
}
searchSourceBuilder.pointInTimeBuilder(pointInTimeBuilder);
if (searchResponse != null) {
SearchHit last = searchResponse.getHits().getHits()[searchResponse.getHits().getHits().length - 1];
searchSourceBuilder.searchAfter(last.getSortValues());
}
SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder);
searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync);
assertThat(searchResponse.pointInTimeId(), equalTo(pitID));
totalHits += searchResponse.getHits().getHits().length;
} while (searchResponse.getHits().getHits().length > 0);
assertThat(totalHits, equalTo(numDocs));
} finally {
ClosePointInTimeResponse closeResponse = execute(new ClosePointInTimeRequest(pitID),
highLevelClient()::closePointInTime, highLevelClient()::closePointInTimeAsync);
assertTrue(closeResponse.isSucceeded());
}
}
private static void assertCountHeader(CountResponse countResponse) { private static void assertCountHeader(CountResponse countResponse) {
assertEquals(0, countResponse.getSkippedShards()); assertEquals(0, countResponse.getSkippedShards());
assertEquals(0, countResponse.getFailedShards()); assertEquals(0, countResponse.getFailedShards());

View file

@ -23,8 +23,11 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchScrollRequest;
@ -80,6 +83,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg; import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
@ -722,6 +726,101 @@ public class SearchDocumentationIT extends ESRestHighLevelClientTestCase {
} }
} }
public void testPointInTime() throws Exception {
RestHighLevelClient client = highLevelClient();
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("posts").id("1").source(XContentType.JSON, "lang", "Java"));
request.add(new IndexRequest("posts").id("2").source(XContentType.JSON, "lang", "Python"));
request.add(new IndexRequest("posts").id("3").source(XContentType.JSON, "lang", "Go"));
request.add(new IndexRequest("posts").id("4").source(XContentType.JSON, "lang", "Rust"));
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
assertSame(RestStatus.OK, bulkResponse.status());
assertFalse(bulkResponse.hasFailures());
// tag::open-point-in-time
OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest("posts"); // <1>
openRequest.keepAlive(TimeValue.timeValueMinutes(30)); // <2>
OpenPointInTimeResponse openResponse = client.openPointInTime(openRequest, RequestOptions.DEFAULT);
String pitId = openResponse.getPointInTimeId(); // <3>
assertNotNull(pitId);
// end::open-point-in-time
// tag::search-point-in-time
SearchRequest searchRequest = new SearchRequest();
final PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pitId); // <1>
pointInTimeBuilder.setKeepAlive("2m"); // <2>
searchRequest.source(new SearchSourceBuilder().pointInTimeBuilder(pointInTimeBuilder)); // <3>
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
assertThat(searchResponse.pointInTimeId(), equalTo(pitId));
// end::search-point-in-time
// tag::close-point-in-time
ClosePointInTimeRequest closeRequest = new ClosePointInTimeRequest(pitId); // <1>
ClearScrollResponse closeResponse = client.closePointInTime(closeRequest, RequestOptions.DEFAULT);
assertTrue(closeResponse.isSucceeded());
// end::close-point-in-time
// Open a point in time with optional arguments
{
openRequest = new OpenPointInTimeRequest("posts").keepAlive(TimeValue.timeValueMinutes(10));
// tag::open-point-in-time-indices-option
openRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED); // <1>
// end::open-point-in-time-indices-option
// tag::open-point-in-time-routing
openRequest.routing("routing"); // <1>
// end::explain-request-routing
// tag::open-point-in-time-preference
openRequest.preference("_local"); // <1>
// end::open-point-in-time-preference
openResponse = client.openPointInTime(openRequest, RequestOptions.DEFAULT);
pitId = openResponse.getPointInTimeId();
client.closePointInTime(new ClosePointInTimeRequest(pitId), RequestOptions.DEFAULT);
}
}
public void testSearchAfterWithPointInTime() throws Exception {
RestHighLevelClient client = highLevelClient();
int numDocs = between(50, 100);
BulkRequest request = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
request.add(new IndexRequest("posts").id(Integer.toString(i)).source(XContentType.JSON, "field", i));
}
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
assertSame(RestStatus.OK, bulkResponse.status());
assertFalse(bulkResponse.hasFailures());
// tag::search-after-with-point-in-time
OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest("posts");
openRequest.keepAlive(TimeValue.timeValueMinutes(20));
String pitId = client.openPointInTime(openRequest, RequestOptions.DEFAULT).getPointInTimeId(); // <1>
assertNotNull(pitId);
SearchResponse searchResponse = null;
int totalHits = 0;
do {
SearchRequest searchRequest = new SearchRequest().source(new SearchSourceBuilder().sort("field").size(5)); // <2>
if (searchResponse != null) {
final SearchHit[] lastHits = searchResponse.getHits().getHits();
searchRequest.source().searchAfter(lastHits[lastHits.length - 1].getSortValues()); // <3>
}
searchRequest.source().pointInTimeBuilder(new PointInTimeBuilder(pitId)); // <4>
searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
assertThat(searchResponse.pointInTimeId(), equalTo(pitId));
totalHits += searchResponse.getHits().getHits().length;
} while (searchResponse.getHits().getHits().length > 0);
assertThat(totalHits, equalTo(numDocs));
ClearScrollResponse closeResponse = client.closePointInTime(new ClosePointInTimeRequest(pitId), RequestOptions.DEFAULT); // <5>
assertTrue(closeResponse.isSucceeded());
// end::search-after-with-point-in-time
}
public void testSearchTemplateWithInlineScript() throws Exception { public void testSearchTemplateWithInlineScript() throws Exception {
indexSearchTestData(); indexSearchTestData();
RestHighLevelClient client = highLevelClient(); RestHighLevelClient client = highLevelClient();

View file

@ -0,0 +1,78 @@
[[java-rest-high-search-point-in-time]]
=== Open a point in time
A point in time must be opened before being used in search requests.
An OpenPointInTimeRequest requires an `index` and `keepAlive` arguments:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SearchDocumentationIT.java[open-point-in-time]
--------------------------------------------------
<1> Create an `OpenPointInTimeRequest` with the target indices
<2> Set the `keep_alive` - a required parameter, which tells
Elasticsearch how long it should keep a point in time around.
<3> Read the returned point in time id, which points to the search context that's
being kept alive and will be used in the search requests.
==== Optional arguments
The following arguments can optionally be provided:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SearchDocumentationIT.java[open-point-in-time-indices-option]
--------------------------------------------------
<1> Setting `IndicesOptions` controls how unavailable indices are resolved and
how wildcard expressions are expanded
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SearchDocumentationIT.java[open-point-in-time-routing]
--------------------------------------------------
<1> Set a routing parameter
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SearchDocumentationIT.java[open-point-in-time-preference]
--------------------------------------------------
<1> Use the preference parameter e.g. to execute the search to prefer local
shards. The default is to randomize across shards.
=== Search with point in time
A point in time can be passed to a search request via a PointInTimeBuilder,
which requires a point in time ID returned from the open API.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SearchDocumentationIT.java[search-point-in-time]
--------------------------------------------------
<1> Create a PointInTimeBuilder with a PIT id
<2> (Optional) Set the keep alive of a point in time
<3> Pass a point in time to a search request
A search request with a point in time does not accept these parameters:
`indices`, `indicesOptions` `routing`, `preference`, and `ccsMinimizeRoundtrips`.
==== Paginate search results with point in time
A point in time can be used in search after requests to paginate search results.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SearchDocumentationIT.java[search-after-with-point-in-time]
--------------------------------------------------
<1> Open a point in time that will be used in multiple search_after requests
<2> Create a search request with the sort parameter
<3> Set the search_after parameter using the sort values from the previous page
<4> Pass a point in time to a search request
<5> Close the point in time
=== Close point in time
Point in time should be closed as soon as they are no longer used in search requests.
A ClosePointInTime request requires a point in time id argument:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SearchDocumentationIT.java[close-point-in-time]
--------------------------------------------------
<1> Create a close point in time request with a PIT id

View file

@ -78,15 +78,9 @@ public class CCSPointInTimeIT extends AbstractMultiClustersTestCase {
} }
private String openPointInTime(String[] indices, TimeValue keepAlive) { private String openPointInTime(String[] indices, TimeValue keepAlive) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest( OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive);
indices,
OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS,
keepAlive,
null,
null
);
final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet(); final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
return response.getSearchContextId(); return response.getPointInTimeId();
} }
private void closePointInTime(String readerId) { private void closePointInTime(String readerId) {

View file

@ -452,15 +452,9 @@ public class PointInTimeIT extends ESIntegTestCase {
} }
private String openPointInTime(String[] indices, TimeValue keepAlive) { private String openPointInTime(String[] indices, TimeValue keepAlive) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest( OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive);
indices,
OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS,
keepAlive,
null,
null
);
final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet(); final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
return response.getSearchContextId(); return response.getPointInTimeId();
} }
private void closePointInTime(String readerId) { private void closePointInTime(String readerId) {

View file

@ -30,8 +30,8 @@ public class ClearScrollResponse extends ActionResponse implements StatusToXCont
private static final ParseField SUCCEEDED = new ParseField("succeeded"); private static final ParseField SUCCEEDED = new ParseField("succeeded");
private static final ParseField NUMFREED = new ParseField("num_freed"); private static final ParseField NUMFREED = new ParseField("num_freed");
private static final ConstructingObjectParser<ClearScrollResponse, Void> PARSER = new ConstructingObjectParser<>("clear_scroll", private static final ConstructingObjectParser<ClosePointInTimeResponse, Void> PARSER = new ConstructingObjectParser<>("clear_scroll",
true, a -> new ClearScrollResponse((boolean)a[0], (int)a[1])); true, a -> new ClosePointInTimeResponse((boolean) a[0], (int) a[1]));
static { static {
PARSER.declareField(constructorArg(), (parser, context) -> parser.booleanValue(), SUCCEEDED, ObjectParser.ValueType.BOOLEAN); PARSER.declareField(constructorArg(), (parser, context) -> parser.booleanValue(), SUCCEEDED, ObjectParser.ValueType.BOOLEAN);
PARSER.declareField(constructorArg(), (parser, context) -> parser.intValue(), NUMFREED, ObjectParser.ValueType.INT); PARSER.declareField(constructorArg(), (parser, context) -> parser.intValue(), NUMFREED, ObjectParser.ValueType.INT);
@ -83,7 +83,7 @@ public class ClearScrollResponse extends ActionResponse implements StatusToXCont
/** /**
* Parse the clear scroll response body into a new {@link ClearScrollResponse} object * Parse the clear scroll response body into a new {@link ClearScrollResponse} object
*/ */
public static ClearScrollResponse fromXContent(XContentParser parser) throws IOException { public static ClosePointInTimeResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.apply(parser, null); return PARSER.apply(parser, null);
} }

View file

@ -27,23 +27,18 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public final class OpenPointInTimeRequest extends ActionRequest implements IndicesRequest.Replaceable { public final class OpenPointInTimeRequest extends ActionRequest implements IndicesRequest.Replaceable {
private String[] indices; private String[] indices;
private final IndicesOptions indicesOptions; private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
private final TimeValue keepAlive; private TimeValue keepAlive;
@Nullable @Nullable
private final String routing; private String routing;
@Nullable @Nullable
private final String preference; private String preference;
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed(); public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();
public OpenPointInTimeRequest(String[] indices, IndicesOptions indicesOptions, public OpenPointInTimeRequest(String... indices) {
TimeValue keepAlive, String routing, String preference) { this.indices = Objects.requireNonNull(indices, "[index] is not specified");
this.indices = Objects.requireNonNull(indices);
this.indicesOptions = Objects.requireNonNull(indicesOptions);
this.keepAlive = keepAlive;
this.routing = routing;
this.preference = preference;
} }
public OpenPointInTimeRequest(StreamInput in) throws IOException { public OpenPointInTimeRequest(StreamInput in) throws IOException {
@ -68,7 +63,7 @@ public final class OpenPointInTimeRequest extends ActionRequest implements Indic
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null; ActionRequestValidationException validationException = null;
if (indices.length == 0) { if (indices == null || indices.length == 0) {
validationException = addValidationError("[index] is not specified", validationException); validationException = addValidationError("[index] is not specified", validationException);
} }
if (keepAlive == null) { if (keepAlive == null) {
@ -93,23 +88,46 @@ public final class OpenPointInTimeRequest extends ActionRequest implements Indic
return indicesOptions; return indicesOptions;
} }
@Override public OpenPointInTimeRequest indicesOptions(IndicesOptions indicesOptions) {
public boolean includeDataStreams() { this.indicesOptions = Objects.requireNonNull(indicesOptions, "[indices_options] parameter must be non null");
return true; return this;
} }
public TimeValue keepAlive() { public TimeValue keepAlive() {
return keepAlive; return keepAlive;
} }
/**
* Set keep alive for the point in time
*/
public OpenPointInTimeRequest keepAlive(TimeValue keepAlive) {
this.keepAlive = Objects.requireNonNull(keepAlive, "[keep_alive] parameter must be non null");
return this;
}
public String routing() { public String routing() {
return routing; return routing;
} }
public OpenPointInTimeRequest routing(String routing) {
this.routing = routing;
return this;
}
public String preference() { public String preference() {
return preference; return preference;
} }
public OpenPointInTimeRequest preference(String preference) {
this.preference = preference;
return this;
}
@Override
public boolean includeDataStreams() {
return true;
}
@Override @Override
public String getDescription() { public String getDescription() {
return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]"; return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]";

View file

@ -12,40 +12,55 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject { public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject {
private static final ParseField ID = new ParseField("id"); private static final ParseField ID = new ParseField("id");
private final String searchContextId; private static final ConstructingObjectParser<OpenPointInTimeResponse, Void> PARSER;
public OpenPointInTimeResponse(String searchContextId) { static {
this.searchContextId = Objects.requireNonNull(searchContextId); PARSER = new ConstructingObjectParser<>("open_point_in_time", true, a -> new OpenPointInTimeResponse((String) a[0]));
PARSER.declareField(constructorArg(), (parser, context) -> parser.text(), ID, ObjectParser.ValueType.STRING);
}
private final String pointInTimeId;
public OpenPointInTimeResponse(String pointInTimeId) {
this.pointInTimeId = Objects.requireNonNull(pointInTimeId, "Point in time parameter must be not null");
} }
public OpenPointInTimeResponse(StreamInput in) throws IOException { public OpenPointInTimeResponse(StreamInput in) throws IOException {
super(in); super(in);
searchContextId = in.readString(); pointInTimeId = in.readString();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(searchContextId); out.writeString(pointInTimeId);
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(ID.getPreferredName(), searchContextId); builder.field(ID.getPreferredName(), pointInTimeId);
builder.endObject(); builder.endObject();
return builder; return builder;
} }
public String getSearchContextId() { public String getPointInTimeId() {
return searchContextId; return pointInTimeId;
}
public static OpenPointInTimeResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
} }
} }

View file

@ -38,11 +38,11 @@ public class RestOpenPointInTimeAction extends BaseRestHandler {
@Override @Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS); final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indices);
final String routing = request.param("routing"); openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS));
final String preference = request.param("preference"); openRequest.routing(request.param("routing"));
final TimeValue keepAlive = TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive"); openRequest.preference(request.param("preference"));
final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indices, indicesOptions, keepAlive, routing, preference); openRequest.keepAlive(TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive"));
return channel -> client.execute(OpenPointInTimeAction.INSTANCE, openRequest, new RestToXContentListener<>(channel)); return channel -> client.execute(OpenPointInTimeAction.INSTANCE, openRequest, new RestToXContentListener<>(channel));
} }
} }

View file

@ -89,10 +89,10 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
private Integer preFilterShardSize; private Integer preFilterShardSize;
private Boolean ccsMinimizeRoundtrips; private boolean ccsMinimizeRoundtrips;
@Nullable @Nullable
private Version minCompatibleShardNode; private final Version minCompatibleShardNode;
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = public static final IndicesOptions DEFAULT_INDICES_OPTIONS =
IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled(); IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled();
@ -225,10 +225,10 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
finalReduce = true; finalReduce = true;
} }
ccsMinimizeRoundtrips = in.readBoolean(); ccsMinimizeRoundtrips = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_7_12_0)) { if (in.getVersion().onOrAfter(Version.V_7_12_0) && in.readBoolean()) {
if (in.readBoolean()) { minCompatibleShardNode = Version.readVersion(in);
minCompatibleShardNode = Version.readVersion(in); } else {
} minCompatibleShardNode = null;
} }
} }
@ -410,6 +410,13 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips; this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips;
} }
/**
* Returns the default value of {@link #ccsMinimizeRoundtrips} of a search request
*/
public static boolean defaultCcsMinimizeRoundtrips(SearchRequest request) {
return request.minCompatibleShardNode == null;
}
/** /**
* A comma separated list of routing values to control the shards the search will be executed on. * A comma separated list of routing values to control the shards the search will be executed on.
*/ */

View file

@ -50,8 +50,8 @@ public final class PointInTimeBuilder implements Writeable, ToXContentObject {
private transient SearchContextId searchContextId; // lazily decoded from the encodedId private transient SearchContextId searchContextId; // lazily decoded from the encodedId
private TimeValue keepAlive; private TimeValue keepAlive;
public PointInTimeBuilder(String encodedId) { public PointInTimeBuilder(String pitID) {
this.encodedId = Objects.requireNonNull(encodedId); this.encodedId = Objects.requireNonNull(pitID, "Point in time ID must be provided");
} }
public PointInTimeBuilder(StreamInput in) throws IOException { public PointInTimeBuilder(StreamInput in) throws IOException {
@ -110,6 +110,14 @@ public final class PointInTimeBuilder implements Writeable, ToXContentObject {
return this; return this;
} }
/**
* If specified, the search layer will keep this point in time around for at least the given keep-alive.
* Otherwise, the point in time will be kept around until the original keep alive elapsed.
*/
public PointInTimeBuilder setKeepAlive(String keepAlive) {
return setKeepAlive(TimeValue.parseTimeValue(keepAlive, "keep_alive"));
}
@Nullable @Nullable
public TimeValue getKeepAlive() { public TimeValue getKeepAlive() {
return keepAlive; return keepAlive;

View file

@ -224,13 +224,8 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
final String pitId; final String pitId;
final SubmitAsyncSearchRequest request; final SubmitAsyncSearchRequest request;
if (randomBoolean()) { if (randomBoolean()) {
OpenPointInTimeRequest openPIT = new OpenPointInTimeRequest( OpenPointInTimeRequest openPIT = new OpenPointInTimeRequest(indexName).keepAlive(TimeValue.timeValueMinutes(between(5, 10)));
new String[]{indexName}, pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPIT).actionGet().getPointInTimeId();
OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS,
TimeValue.timeValueMinutes(between(5, 10)),
null,
null);
pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPIT).actionGet().getSearchContextId();
final PointInTimeBuilder pit = new PointInTimeBuilder(pitId); final PointInTimeBuilder pit = new PointInTimeBuilder(pitId);
if (randomBoolean()) { if (randomBoolean()) {
pit.setKeepAlive(TimeValue.timeValueMillis(randomIntBetween(1, 3600))); pit.setKeepAlive(TimeValue.timeValueMillis(randomIntBetween(1, 3600)));

View file

@ -128,15 +128,11 @@ public class PITAwareQueryClient extends BasicQueryClient {
} }
private <Response> void openPIT(ActionListener<Response> listener, Runnable runnable) { private <Response> void openPIT(ActionListener<Response> listener, Runnable runnable) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest( OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices)
indices, .indicesOptions(IndexResolver.FIELD_CAPS_INDICES_OPTIONS)
IndexResolver.FIELD_CAPS_INDICES_OPTIONS, .keepAlive(keepAlive);
keepAlive,
null,
null
);
client.execute(OpenPointInTimeAction.INSTANCE, request, wrap(r -> { client.execute(OpenPointInTimeAction.INSTANCE, request, wrap(r -> {
pitId = r.getSearchContextId(); pitId = r.getPointInTimeId();
runnable.run(); runnable.run();
}, },
listener::onFailure)); listener::onFailure));

View file

@ -206,9 +206,10 @@ public class FrozenIndexIT extends ESIntegTestCase {
client().prepareIndex(indexName).setSource("created_date", "2011-02-02").get(); client().prepareIndex(indexName).setSource("created_date", "2011-02-02").get();
} }
assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName)).actionGet()); assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName)).actionGet());
final String pitId = client().execute(OpenPointInTimeAction.INSTANCE, final OpenPointInTimeRequest openPointInTimeRequest = new OpenPointInTimeRequest(indexName).
new OpenPointInTimeRequest(new String[]{indexName}, IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED, indicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED)
TimeValue.timeValueMinutes(2), null, null)).actionGet().getSearchContextId(); .keepAlive(TimeValue.timeValueMinutes(2));
final String pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPointInTimeRequest).actionGet().getPointInTimeId();
try { try {
SearchResponse resp = client().prepareSearch() SearchResponse resp = client().prepareSearch()
.setIndices(indexName) .setIndices(indexName)

View file

@ -83,10 +83,11 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
} }
String openReaders(TimeValue keepAlive, String... indices) { String openReaders(TimeValue keepAlive, String... indices) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest( OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices)
indices, IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED, keepAlive, null, null); .indicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED)
.keepAlive(keepAlive);
final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet(); final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
return response.getSearchContextId(); return response.getPointInTimeId();
} }
public void testCloseFreezeAndOpen() throws Exception { public void testCloseFreezeAndOpen() throws Exception {

View file

@ -141,16 +141,10 @@ public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase
mountSnapshot(repositoryName, snapshotOne.getName(), indexName, indexName, indexSettings); mountSnapshot(repositoryName, snapshotOne.getName(), indexName, indexName, indexSettings);
ensureGreen(indexName); ensureGreen(indexName);
final String pitId = client().execute( final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indexName).indicesOptions(
OpenPointInTimeAction.INSTANCE, IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED
new OpenPointInTimeRequest( ).keepAlive(TimeValue.timeValueMinutes(2));
new String[] { indexName }, final String pitId = client().execute(OpenPointInTimeAction.INSTANCE, openRequest).actionGet().getPointInTimeId();
IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED,
TimeValue.timeValueMinutes(2),
null,
null
)
).actionGet().getSearchContextId();
try { try {
SearchResponse resp = client().prepareSearch() SearchResponse resp = client().prepareSearch()
.setIndices(indexName) .setIndices(indexName)

View file

@ -1029,12 +1029,11 @@ public class FieldLevelSecurityTests extends SecurityIntegTestCase {
} }
static String openPointInTime(String userName, TimeValue keepAlive, String... indices) { static String openPointInTime(String userName, TimeValue keepAlive, String... indices) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest( OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive);
indices, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS, keepAlive, null, null);
final OpenPointInTimeResponse response = client() final OpenPointInTimeResponse response = client()
.filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue(userName, USERS_PASSWD))) .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue(userName, USERS_PASSWD)))
.execute(OpenPointInTimeAction.INSTANCE, request).actionGet(); .execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
return response.getSearchContextId(); return response.getPointInTimeId();
} }
public void testPointInTimeId() throws Exception { public void testPointInTimeId() throws Exception {

View file

@ -630,10 +630,14 @@ public class AuthorizationServiceTests extends ESTestCase {
"other_cluster:" + randomFrom(randomAlphaOfLength(5), "*", randomAlphaOfLength(4) + "*"), "other_cluster:" + randomFrom(randomAlphaOfLength(5), "*", randomAlphaOfLength(4) + "*"),
"other_cluster:" + randomFrom(randomAlphaOfLength(5), "*", randomAlphaOfLength(4) + "*") "other_cluster:" + randomFrom(randomAlphaOfLength(5), "*", randomAlphaOfLength(4) + "*")
}; };
final OpenPointInTimeRequest openPointInTimeRequest = new OpenPointInTimeRequest( final OpenPointInTimeRequest openPointInTimeRequest = new OpenPointInTimeRequest(indices)
indices, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS, TimeValue.timeValueMinutes(randomLongBetween(1, 10)), .keepAlive(TimeValue.timeValueMinutes(randomLongBetween(1, 10)));
randomAlphaOfLength(5), randomAlphaOfLength(5) if (randomBoolean()) {
); openPointInTimeRequest.routing(randomAlphaOfLength(5));
}
if (randomBoolean()) {
openPointInTimeRequest.preference(randomAlphaOfLength(5));
}
if (hasLocalIndices) { if (hasLocalIndices) {
assertThrowsAuthorizationException( assertThrowsAuthorizationException(
() -> authorize(authentication, OpenPointInTimeAction.NAME, openPointInTimeRequest), () -> authorize(authentication, OpenPointInTimeAction.NAME, openPointInTimeRequest),