mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-24 23:27:25 -04:00
Propagate ?master_timeout query parameter from CCR apis to downstreams (#105168)
* Propagate ?master_timeout query parameter from crr put follow api * Update docs/changelog/105168.yaml * fix sily mistake * spotless * fix 2 * Add ?master_timeout query param to apis which use it * Update rest api specs + yaml tests * Add master_timeout (+timeout) to remaining api endpoints * Update 105168.yaml Correct message * Enable randomly ?master_timeout param to ccr IT tests * Add timeout param to RestCcrStatsAction * propagate master_timeout param between put_follow -> resume_follow calls * Propagate master_timeout down to persistent task layer * Add transport version for ccr stats request object change * Add BwC test for CcrStatsAction.Request obj
This commit is contained in:
parent
412f2f5617
commit
1e253a04fc
56 changed files with 368 additions and 17 deletions
5
docs/changelog/105168.yaml
Normal file
5
docs/changelog/105168.yaml
Normal file
|
@ -0,0 +1,5 @@
|
|||
pr: 105168
|
||||
summary: Add ?master_timeout query parameter to ccr apis
|
||||
area: CCR
|
||||
type: bug
|
||||
issues: []
|
|
@ -54,6 +54,13 @@ This API deletes a configured collection of
|
|||
`<auto_follow_pattern_name>`::
|
||||
(Required, string) Specifies the auto-follow pattern collection to delete.
|
||||
|
||||
[[ccr-delete-auto-follow-pattern-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[[ccr-delete-auto-follow-pattern-examples]]
|
||||
==== {api-examples-title}
|
||||
|
|
|
@ -72,6 +72,14 @@ This API will return the specified auto-follow pattern collection.
|
|||
to retrieve. If you do not specify a name, the API returns information for all
|
||||
collections.
|
||||
|
||||
[[ccr-get-auto-follow-pattern-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[[ccr-get-auto-follow-pattern-examples]]
|
||||
==== {api-examples-title}
|
||||
|
||||
|
|
|
@ -40,6 +40,13 @@ meantime.
|
|||
`<auto_follow_pattern_name>`::
|
||||
(Required, string) Name of the auto-follow pattern to pause.
|
||||
|
||||
[[ccr-pause-auto-follow-pattern-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[[ccr-pause-auto-follow-pattern-examples]]
|
||||
==== {api-examples-title}
|
||||
|
|
|
@ -71,6 +71,14 @@ the new patterns.
|
|||
`<auto_follow_pattern_name>`::
|
||||
(Required, string) The name of the collection of auto-follow patterns.
|
||||
|
||||
[[ccr-put-auto-follow-pattern-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[[ccr-put-auto-follow-pattern-request-body]]
|
||||
==== {api-request-body-title}
|
||||
|
||||
|
|
|
@ -35,6 +35,13 @@ have been deleted or closed in the meantime.
|
|||
`<auto_follow_pattern_name>`::
|
||||
(Required, string) Specifies the name of the auto-follow pattern to resume.
|
||||
|
||||
[[ccr-resume-auto-follow-pattern-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[[ccr-resume-auto-follow-pattern-examples]]
|
||||
==== {api-examples-title}
|
||||
|
|
|
@ -49,6 +49,14 @@ replication options and whether the follower indices are active or paused.
|
|||
`<index>`::
|
||||
(Required, string) A comma-delimited list of follower index patterns.
|
||||
|
||||
[[ccr-get-follow-info-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[role="child_attributes"]
|
||||
[[ccr-get-follow-info-response-body]]
|
||||
==== {api-response-body-title}
|
||||
|
|
|
@ -56,6 +56,12 @@ following tasks associated with each shard for the specified indices.
|
|||
`<index>`::
|
||||
(Required, string) A comma-delimited list of index patterns.
|
||||
|
||||
[[ccr-get-follow-stats-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`timeout`::
|
||||
(Optional, time) Controls the amount of time to wait for results. Defaults to unlimited.
|
||||
|
||||
[role="child_attributes"]
|
||||
[[ccr-get-follow-stats-response-body]]
|
||||
==== {api-response-body-title}
|
||||
|
|
|
@ -100,6 +100,12 @@ the <<ccr-post-unfollow,unfollow API>> is invoked.
|
|||
`<leader_index>`::
|
||||
(Required, string) The name of the leader index.
|
||||
|
||||
[[ccr-post-forget-follower-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`timeout`::
|
||||
(Optional, time) Controls the amount of time to wait for results. Defaults to unlimited.
|
||||
|
||||
[[ccr-post-forget-follower-request-body]]
|
||||
==== {api-request-body-title}
|
||||
|
||||
|
|
|
@ -53,6 +53,14 @@ following task.
|
|||
`<follower_index>`::
|
||||
(Required, string) The name of the follower index.
|
||||
|
||||
[[ccr-post-pause-follow-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[[ccr-post-pause-follow-examples]]
|
||||
==== {api-examples-title}
|
||||
|
||||
|
|
|
@ -66,6 +66,14 @@ returns, the follower index will resume fetching operations from the leader inde
|
|||
`<follower_index>`::
|
||||
(Required, string) The name of the follower index.
|
||||
|
||||
[[ccr-post-resume-follow-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[[ccr-post-resume-follow-request-body]]
|
||||
==== {api-request-body-title}
|
||||
include::../follow-request-body.asciidoc[tag=ccr-resume-follow-request-body]
|
||||
|
|
|
@ -60,6 +60,14 @@ irreversible operation.
|
|||
`<follower_index>`::
|
||||
(Required, string) The name of the follower index.
|
||||
|
||||
[[ccr-post-unfollow-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[[ccr-post-unfollow-examples]]
|
||||
==== {api-examples-title}
|
||||
|
||||
|
|
|
@ -65,6 +65,11 @@ referenced leader index. When this API returns, the follower index exists, and
|
|||
follower shard requires transferring all the remote Lucene segment files to
|
||||
the follower index.
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
|
||||
[[ccr-put-follow-request-body]]
|
||||
==== {api-request-body-title}
|
||||
|
|
|
@ -50,6 +50,17 @@ This API gets {ccr} stats. This API will return all stats related to {ccr}. In
|
|||
particular, this API returns stats about auto-following, and returns the same
|
||||
shard-level stats as in the <<ccr-get-follow-stats,get follower stats API>>.
|
||||
|
||||
[[ccr-get-stats-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
`timeout`::
|
||||
(Optional, time) Controls the amount of time to wait for results. Defaults to unlimited.
|
||||
|
||||
`master_timeout`::
|
||||
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
|
||||
a connection to the master node. If no response is received before the timeout
|
||||
expires, the request fails and returns an error. Defaults to `30s`.
|
||||
|
||||
[role="child_attributes"]
|
||||
[[ccr-get-stats-response-body]]
|
||||
==== {api-response-body-title}
|
||||
|
|
|
@ -24,6 +24,12 @@
|
|||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,10 @@
|
|||
"type":"string",
|
||||
"description":"Sets the number of shard copies that must be active before returning. Defaults to 0. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)",
|
||||
"default":"0"
|
||||
},
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
},
|
||||
"body":{
|
||||
|
|
|
@ -24,6 +24,12 @@
|
|||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,12 @@
|
|||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,12 @@
|
|||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout"
|
||||
}
|
||||
},
|
||||
"body":{
|
||||
"description":"the name and UUID of the follower index, the name of the cluster containing the follower index, and the alias from the perspective of that cluster for the remote cluster containing the leader index",
|
||||
"required":true
|
||||
|
|
|
@ -30,6 +30,12 @@
|
|||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,12 @@
|
|||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,12 @@
|
|||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,12 @@
|
|||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
},
|
||||
"body":{
|
||||
"description":"The specification of the auto follow pattern",
|
||||
"required":true
|
||||
|
|
|
@ -24,6 +24,12 @@
|
|||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,12 @@
|
|||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
},
|
||||
"body":{
|
||||
"description":"The name of the leader index and other optional ccr related parameters",
|
||||
"required":false
|
||||
|
|
|
@ -18,6 +18,16 @@
|
|||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout"
|
||||
},
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,12 @@
|
|||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout for connection to master node"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -160,6 +160,7 @@ public class TransportVersions {
|
|||
public static final TransportVersion ADD_PERSISTENT_TASK_EXCEPTIONS = def(8_619_00_0);
|
||||
public static final TransportVersion ESQL_REDUCER_NODE_FRAGMENT = def(8_620_00_0);
|
||||
public static final TransportVersion FAILURE_STORE_ROLLOVER = def(8_621_00_0);
|
||||
public static final TransportVersion CCR_STATS_API_TIMEOUT_PARAM = def(8_622_00_0);
|
||||
|
||||
/*
|
||||
* STOP! READ THIS FIRST! No, really,
|
||||
|
|
|
@ -201,6 +201,7 @@
|
|||
- do:
|
||||
ccr.put_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
master_timeout: 10s
|
||||
body:
|
||||
remote_cluster: local
|
||||
leader_index_patterns: ['logs-*']
|
||||
|
@ -224,6 +225,7 @@
|
|||
- do:
|
||||
ccr.pause_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
master_timeout: 10s
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
|
@ -243,6 +245,7 @@
|
|||
- do:
|
||||
ccr.resume_auto_follow_pattern:
|
||||
name: pattern_test
|
||||
master_timeout: 10s
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
ccr.follow:
|
||||
index: bar
|
||||
wait_for_active_shards: 1
|
||||
master_timeout: 10s
|
||||
body:
|
||||
remote_cluster: local
|
||||
leader_index: foo
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
ccr.follow:
|
||||
index: bar
|
||||
wait_for_active_shards: 1
|
||||
master_timeout: 10s
|
||||
body:
|
||||
remote_cluster: local
|
||||
leader_index: foo
|
||||
|
|
|
@ -734,17 +734,26 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|||
request.setLeaderIndexExclusionPatterns(exclusionPatterns);
|
||||
// Need to set this, because following an index in the same cluster
|
||||
request.setFollowIndexNamePattern("copy-{{leader_index}}");
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
|
||||
assertTrue(followerClient().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
|
||||
}
|
||||
|
||||
private void deleteAutoFollowPattern(final String name) {
|
||||
DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request(name);
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
assertTrue(followerClient().execute(DeleteAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
|
||||
}
|
||||
|
||||
private AutoFollowStats getAutoFollowStats() {
|
||||
CcrStatsAction.Request request = new CcrStatsAction.Request();
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
return followerClient().execute(CcrStatsAction.INSTANCE, request).actionGet().getAutoFollowStats();
|
||||
}
|
||||
|
||||
|
@ -756,17 +765,26 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|||
|
||||
private void pauseAutoFollowPattern(final String name) {
|
||||
ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, false);
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet());
|
||||
}
|
||||
|
||||
private void resumeAutoFollowPattern(final String name) {
|
||||
ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, true);
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet());
|
||||
}
|
||||
|
||||
private AutoFollowMetadata.AutoFollowPattern getAutoFollowPattern(final String name) {
|
||||
GetAutoFollowPatternAction.Request request = new GetAutoFollowPatternAction.Request();
|
||||
request.setName(name);
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
GetAutoFollowPatternAction.Response response = followerClient().execute(GetAutoFollowPatternAction.INSTANCE, request).actionGet();
|
||||
assertTrue(response.getAutoFollowPatterns().containsKey(name));
|
||||
return response.getAutoFollowPatterns().get(name);
|
||||
|
|
|
@ -80,6 +80,9 @@ public class TransportCcrStatsAction extends TransportMasterNodeAction<CcrStatsA
|
|||
) throws Exception {
|
||||
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
|
||||
statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
if (request.getTimeout() != null) {
|
||||
statsRequest.setTimeout(request.getTimeout());
|
||||
}
|
||||
client.execute(FollowStatsAction.INSTANCE, statsRequest, listener.delegateFailureAndWrap((l, statsResponse) -> {
|
||||
AutoFollowStats stats = autoFollowCoordinator.getStats();
|
||||
l.onResponse(new CcrStatsAction.Response(stats, statsResponse));
|
||||
|
|
|
@ -98,7 +98,7 @@ public class TransportPauseFollowAction extends AcknowledgedTransportMasterNodeA
|
|||
final ResponseHandler responseHandler = new ResponseHandler(shardFollowTaskIds.size(), listener);
|
||||
for (String taskId : shardFollowTaskIds) {
|
||||
final int taskSlot = i++;
|
||||
persistentTasksService.sendRemoveRequest(taskId, null, responseHandler.getActionListener(taskSlot));
|
||||
persistentTasksService.sendRemoveRequest(taskId, request.masterNodeTimeout(), responseHandler.getActionListener(taskSlot));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -297,6 +297,7 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
|
|||
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
|
||||
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
|
||||
resumeFollowRequest.setParameters(new FollowParameters(parameters));
|
||||
resumeFollowRequest.masterNodeTimeout(request.masterNodeTimeout());
|
||||
clientWithHeaders.execute(
|
||||
ResumeFollowAction.INSTANCE,
|
||||
resumeFollowRequest,
|
||||
|
|
|
@ -211,7 +211,7 @@ public class TransportResumeFollowAction extends AcknowledgedTransportMasterNode
|
|||
taskId,
|
||||
ShardFollowTask.NAME,
|
||||
shardFollowTask,
|
||||
null,
|
||||
request.masterNodeTimeout(),
|
||||
handler.getActionListener(shardId)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ public class TransportUnfollowAction extends AcknowledgedTransportMasterNodeActi
|
|||
final ClusterState state,
|
||||
final ActionListener<AcknowledgedResponse> listener
|
||||
) {
|
||||
submitUnbatchedTask("unfollow_action", new ClusterStateUpdateTask() {
|
||||
submitUnbatchedTask("unfollow_action", new ClusterStateUpdateTask(request.masterNodeTimeout()) {
|
||||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState current) {
|
||||
|
|
|
@ -38,6 +38,10 @@ public class RestCcrStatsAction extends BaseRestHandler {
|
|||
@Override
|
||||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
|
||||
final CcrStatsAction.Request request = new CcrStatsAction.Request();
|
||||
if (restRequest.hasParam("timeout")) {
|
||||
request.setTimeout(restRequest.paramAsTime("timeout", null));
|
||||
}
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return channel -> client.execute(
|
||||
CcrStatsAction.INSTANCE,
|
||||
request,
|
||||
|
|
|
@ -32,6 +32,7 @@ public class RestDeleteAutoFollowPatternAction extends BaseRestHandler {
|
|||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||
Request request = new Request(restRequest.param("name"));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ public class RestFollowInfoAction extends BaseRestHandler {
|
|||
@Override
|
||||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
|
||||
final FollowInfoAction.Request request = new FollowInfoAction.Request();
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
request.setFollowerIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
|
||||
return channel -> client.execute(FollowInfoAction.INSTANCE, request, new RestRefCountedChunkedToXContentListener<>(channel));
|
||||
}
|
||||
|
|
|
@ -40,6 +40,9 @@ public class RestFollowStatsAction extends BaseRestHandler {
|
|||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
|
||||
final FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
|
||||
request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
|
||||
if (restRequest.hasParam("timeout")) {
|
||||
request.setTimeout(restRequest.param("timeout"));
|
||||
}
|
||||
return channel -> client.execute(
|
||||
FollowStatsAction.INSTANCE,
|
||||
request,
|
||||
|
|
|
@ -40,7 +40,11 @@ public class RestForgetFollowerAction extends BaseRestHandler {
|
|||
|
||||
private static Request createRequest(final RestRequest restRequest, final String leaderIndex) throws IOException {
|
||||
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
|
||||
return Request.fromXContent(parser, leaderIndex);
|
||||
Request request = Request.fromXContent(parser, leaderIndex);
|
||||
if (restRequest.hasParam("timeout")) {
|
||||
request.timeout(restRequest.paramAsTime("timeout", null));
|
||||
}
|
||||
return request;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ public class RestGetAutoFollowPatternAction extends BaseRestHandler {
|
|||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||
Request request = new Request();
|
||||
request.setName(restRequest.param("name"));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ public class RestPauseAutoFollowPatternAction extends BaseRestHandler {
|
|||
@Override
|
||||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
|
||||
Request request = new Request(restRequest.param("name"), false);
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ public class RestPauseFollowAction extends BaseRestHandler {
|
|||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||
Request request = new Request(restRequest.param("index"));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,9 @@ public class RestPutAutoFollowPatternAction extends BaseRestHandler {
|
|||
|
||||
private static Request createRequest(RestRequest restRequest) throws IOException {
|
||||
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
|
||||
return Request.fromXContent(parser, restRequest.param("name"));
|
||||
Request request = Request.fromXContent(parser, restRequest.param("name"));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return request;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,8 +40,11 @@ public class RestPutFollowAction extends BaseRestHandler {
|
|||
|
||||
private static Request createRequest(RestRequest restRequest) throws IOException {
|
||||
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
|
||||
ActiveShardCount waitForActiveShards = ActiveShardCount.parseString(restRequest.param("wait_for_active_shards"));
|
||||
return Request.fromXContent(parser, restRequest.param("index"), waitForActiveShards);
|
||||
final Request request = Request.fromXContent(parser);
|
||||
request.waitForActiveShards(ActiveShardCount.parseString(restRequest.param("wait_for_active_shards")));
|
||||
request.setFollowerIndex(restRequest.param("index"));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return request;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ public class RestResumeAutoFollowPatternAction extends BaseRestHandler {
|
|||
@Override
|
||||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
|
||||
Request request = new Request(restRequest.param("name"), true);
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,14 +38,16 @@ public class RestResumeFollowAction extends BaseRestHandler {
|
|||
}
|
||||
|
||||
static Request createRequest(RestRequest restRequest) throws IOException {
|
||||
Request request;
|
||||
if (restRequest.hasContentOrSourceParam()) {
|
||||
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
|
||||
return Request.fromXContent(parser, restRequest.param("index"));
|
||||
request = Request.fromXContent(parser, restRequest.param("index"));
|
||||
}
|
||||
} else {
|
||||
Request request = new Request();
|
||||
request = new Request();
|
||||
request.setFollowerIndex(restRequest.param("index"));
|
||||
}
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return request;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ public class RestUnfollowAction extends BaseRestHandler {
|
|||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||
UnfollowAction.Request request = new UnfollowAction.Request(restRequest.param("index"));
|
||||
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
|
||||
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
|
|
|
@ -594,6 +594,9 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
|||
request.getParameters().setMaxReadRequestSize(ByteSizeValue.ofBytes(between(1, 32 * 1024 * 1024)));
|
||||
request.getParameters().setMaxReadRequestOperationCount(between(1, 10000));
|
||||
request.waitForActiveShards(waitForActiveShards);
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
|
@ -602,6 +605,9 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
|||
request.setFollowerIndex(followerIndex);
|
||||
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10));
|
||||
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10));
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
|
@ -103,6 +103,9 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
|
|||
request.setFollowerIndex(followerIndex);
|
||||
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(1));
|
||||
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(1));
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
|
@ -114,6 +117,9 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
|
|||
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(1));
|
||||
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(1));
|
||||
request.waitForActiveShards(ActiveShardCount.ONE);
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("10s", "20s", "30s"));
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
|
@ -61,7 +61,10 @@ public class PutFollowActionRequestTests extends AbstractXContentSerializingTest
|
|||
|
||||
@Override
|
||||
protected PutFollowAction.Request doParseInstance(XContentParser parser) throws IOException {
|
||||
return PutFollowAction.Request.fromXContent(parser, "followerIndex", ActiveShardCount.DEFAULT);
|
||||
PutFollowAction.Request request = PutFollowAction.Request.fromXContent(parser);
|
||||
request.waitForActiveShards(ActiveShardCount.DEFAULT);
|
||||
request.setFollowerIndex("followerIndex");
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.core.ccr.action;
|
||||
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
|
@ -16,6 +17,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
|
||||
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.xcontent.ToXContent;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
|
||||
|
||||
|
@ -34,8 +36,13 @@ public class CcrStatsAction extends ActionType<CcrStatsAction.Response> {
|
|||
|
||||
public static class Request extends MasterNodeRequest<Request> {
|
||||
|
||||
private TimeValue timeout;
|
||||
|
||||
public Request(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
if (in.getTransportVersion().onOrAfter(TransportVersions.CCR_STATS_API_TIMEOUT_PARAM)) {
|
||||
timeout = in.readOptionalTimeValue();
|
||||
}
|
||||
}
|
||||
|
||||
public Request() {}
|
||||
|
@ -48,6 +55,43 @@ public class CcrStatsAction extends ActionType<CcrStatsAction.Response> {
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
if (out.getTransportVersion().onOrAfter(TransportVersions.CCR_STATS_API_TIMEOUT_PARAM)) {
|
||||
out.writeOptionalTimeValue(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
public TimeValue getTimeout() {
|
||||
return this.timeout;
|
||||
}
|
||||
|
||||
public void setTimeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public void setTimeout(String timeout) {
|
||||
this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Request that = (Request) o;
|
||||
return Objects.equals(this.timeout, that.timeout) && Objects.equals(this.masterNodeTimeout, that.masterNodeTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(this.timeout, this.masterNodeTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CcrStatsAction.Request[timeout=" + timeout + ", masterNodeTimeout=" + masterNodeTimeout + "]";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -64,13 +64,10 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
|
|||
FollowParameters.initParser(PARSER);
|
||||
}
|
||||
|
||||
public static Request fromXContent(final XContentParser parser, final String followerIndex, ActiveShardCount waitForActiveShards)
|
||||
throws IOException {
|
||||
public static Request fromXContent(final XContentParser parser) throws IOException {
|
||||
PutFollowParameters parameters = PARSER.parse(parser, null);
|
||||
|
||||
Request request = new Request();
|
||||
request.waitForActiveShards(waitForActiveShards);
|
||||
request.setFollowerIndex(followerIndex);
|
||||
request.setRemoteCluster(parameters.remoteCluster);
|
||||
request.setLeaderIndex(parameters.leaderIndex);
|
||||
request.setDataStreamName(parameters.dataStreamName);
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.ccr.action;
|
||||
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.test.TransportVersionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class CcrStatsActionTests extends AbstractWireSerializingTestCase<CcrStatsAction.Request> {
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<CcrStatsAction.Request> instanceReader() {
|
||||
return CcrStatsAction.Request::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CcrStatsAction.Request createTestInstance() {
|
||||
var request = new CcrStatsAction.Request();
|
||||
request.setTimeout(randomFrom("1s", "5s", "10s", "15s"));
|
||||
request.masterNodeTimeout(randomFrom("1s", "5s", "10s", "15s"));
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CcrStatsAction.Request mutateInstance(CcrStatsAction.Request instance) throws IOException {
|
||||
return switch (randomInt(1)) {
|
||||
case 0 -> {
|
||||
var mutatedInstance = new CcrStatsAction.Request();
|
||||
mutatedInstance.setTimeout(instance.getTimeout());
|
||||
mutatedInstance.masterNodeTimeout(randomFrom("20s", "25s", "30s"));
|
||||
yield mutatedInstance;
|
||||
}
|
||||
case 1 -> {
|
||||
var mutatedInstance = new CcrStatsAction.Request();
|
||||
mutatedInstance.setTimeout(randomFrom("20s", "25s", "30s"));
|
||||
mutatedInstance.masterNodeTimeout(instance.masterNodeTimeout());
|
||||
yield mutatedInstance;
|
||||
}
|
||||
default -> throw new RuntimeException("Cannot happen");
|
||||
};
|
||||
}
|
||||
|
||||
public void testSerializationBwc() throws IOException {
|
||||
// In previous version `timeout` is not set
|
||||
var request = new CcrStatsAction.Request();
|
||||
if (randomBoolean()) {
|
||||
request.masterNodeTimeout(randomFrom("20s", "25s", "30s"));
|
||||
}
|
||||
assertSerialization(request, TransportVersionUtils.getPreviousVersion(TransportVersions.CCR_STATS_API_TIMEOUT_PARAM));
|
||||
assertSerialization(request, TransportVersions.MINIMUM_CCS_VERSION);
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue