Merge remote-tracking branch 'upstream/main' into lucene_snapshot_9_11

This commit is contained in:
Benjamin Trent 2024-06-12 08:05:36 -04:00
commit 08298dcd69
266 changed files with 5713 additions and 4260 deletions

View file

@ -1,14 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
esplugin {
description 'A test module that tracks seeks in lucene Directories'
classname 'org.elasticsearch.test.seektracker.SeekTrackerPlugin'
}
apply plugin: 'elasticsearch.internal-cluster-test'

View file

@ -1,55 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;
public class SeekTrackerPluginIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(SeekTrackerPlugin.class);
}
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(SeekTrackerPlugin.SEEK_TRACKING_ENABLED.getKey(), "true")
.build();
}
public void testSeekTrackerPlugin() throws InterruptedException {
assertAcked(indicesAdmin().prepareCreate("index"));
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < 100; i++) {
docs.add(prepareIndex("index").setSource("field", "term" + i % 5));
}
indexRandom(true, docs);
prepareSearch("index").setQuery(QueryBuilders.termQuery("field", "term2")).get().decRef();
SeekStatsResponse response = client().execute(SeekTrackerPlugin.SEEK_STATS_ACTION, new SeekStatsRequest("index")).actionGet();
List<ShardSeekStats> shardSeekStats = response.getSeekStats().get("index");
assertThat(shardSeekStats.size(), greaterThan(0));
}
}

View file

@ -1,51 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
public class IndexSeekTracker {
private final String index;
private final Map<String, Map<String, LongAdder>> seeks = new HashMap<>();
public IndexSeekTracker(String index) {
this.index = index;
}
public void track(String shard) {
seeks.computeIfAbsent(shard, k -> new ConcurrentHashMap<>()); // increment can be called by multiple threads
}
public void increment(String shard, String file) {
seeks.get(shard).computeIfAbsent(file, s -> new LongAdder()).increment();
}
public List<ShardSeekStats> getSeeks() {
List<ShardSeekStats> values = new ArrayList<>();
seeks.forEach((k, v) -> values.add(getSeeksForShard(k)));
return values;
}
private ShardSeekStats getSeeksForShard(String shard) {
Map<String, Long> seeksPerFile = new HashMap<>();
seeks.get(shard).forEach((name, adder) -> seeksPerFile.put(name, adder.longValue()));
return new ShardSeekStats(shard, seeksPerFile);
}
@Override
public String toString() {
return "seeks for " + index + ": " + seeks;
}
}

View file

@ -1,52 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class NodeSeekStats extends BaseNodeResponse implements ToXContentFragment {
private final Map<String, List<ShardSeekStats>> seeks;
public NodeSeekStats(DiscoveryNode node, Map<String, List<ShardSeekStats>> seeks) {
super(node);
this.seeks = seeks;
}
public NodeSeekStats(StreamInput in) throws IOException {
super(in);
this.seeks = in.readMap(s -> s.readCollectionAsList(ShardSeekStats::new));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(seeks, StreamOutput::writeCollection);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.mapContents(seeks);
return builder;
}
public Map<String, List<ShardSeekStats>> getSeekStats() {
return seeks;
}
}

View file

@ -1,41 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.util.List;
public class RestSeekStatsAction extends BaseRestHandler {
@Override
public String getName() {
return "seek_stats_action";
}
@Override
public List<Route> routes() {
return List.of(
new RestHandler.Route(RestRequest.Method.GET, "/_seek_stats"),
new RestHandler.Route(RestRequest.Method.GET, "/{index}/_seek_stats")
);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
String[] indices = request.paramAsStringArray("index", Strings.EMPTY_ARRAY);
SeekStatsRequest seekStatsRequest = new SeekStatsRequest(indices);
return channel -> client.execute(SeekTrackerPlugin.SEEK_STATS_ACTION, seekStatsRequest, new RestToXContentListener<>(channel));
}
}

View file

@ -1,42 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class SeekStatsRequest extends BaseNodesRequest<SeekStatsRequest> {
private final String[] indices;
public SeekStatsRequest(String... indices) {
super(Strings.EMPTY_ARRAY);
this.indices = indices;
}
public SeekStatsRequest(StreamInput in) throws IOException {
super(in);
this.indices = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
}
public String[] getIndices() {
return indices;
}
}

View file

@ -1,69 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SeekStatsResponse extends BaseNodesResponse<NodeSeekStats> implements ToXContentObject {
public SeekStatsResponse(ClusterName clusterName, List<NodeSeekStats> seekStats, List<FailedNodeException> failures) {
super(clusterName, seekStats, failures);
}
@Override
protected List<NodeSeekStats> readNodesFrom(StreamInput in) {
return TransportAction.localOnly();
}
@Override
protected void writeNodesTo(StreamOutput out, List<NodeSeekStats> nodes) {
TransportAction.localOnly();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
for (NodeSeekStats seekStats : getNodes()) {
builder.startObject(seekStats.getNode().getId());
seekStats.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
return builder;
}
public Map<String, List<ShardSeekStats>> getSeekStats() {
Map<String, List<ShardSeekStats>> combined = new HashMap<>();
for (NodeSeekStats nodeSeekStats : getNodes()) {
nodeSeekStats.getSeekStats()
.forEach((index, shardSeekStats) -> combined.computeIfAbsent(index, k -> new ArrayList<>()).addAll(shardSeekStats));
}
return combined;
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View file

@ -1,30 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import java.util.HashMap;
import java.util.Map;
public class SeekStatsService {
private final Map<String, IndexSeekTracker> seeks = new HashMap<>();
public IndexSeekTracker registerIndex(String index) {
return seeks.computeIfAbsent(index, IndexSeekTracker::new);
}
public Map<String, IndexSeekTracker> getSeekStats() {
return seeks;
}
public IndexSeekTracker getSeekStats(String index) {
return seeks.get(index);
}
}

View file

@ -1,100 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
public class SeekTrackerPlugin extends Plugin implements ActionPlugin {
/** Setting for enabling or disabling seek tracking. Defaults to false. */
public static final Setting<Boolean> SEEK_TRACKING_ENABLED = Setting.boolSetting(
"seektracker.enabled",
false,
Setting.Property.NodeScope
);
public static final ActionType<SeekStatsResponse> SEEK_STATS_ACTION = new ActionType<>("cluster:monitor/seek_stats");
private final SeekStatsService seekStatsService = new SeekStatsService();
private final boolean enabled;
public SeekTrackerPlugin(Settings settings) {
this.enabled = SEEK_TRACKING_ENABLED.get(settings);
}
@Override
public List<Setting<?>> getSettings() {
return List.of(SEEK_TRACKING_ENABLED);
}
@Override
public Collection<?> createComponents(PluginServices services) {
return Collections.singletonList(seekStatsService);
}
// seeks per index/shard/file
@Override
public void onIndexModule(IndexModule indexModule) {
if (enabled) {
IndexSeekTracker seekTracker = seekStatsService.registerIndex(indexModule.getIndex().getName());
indexModule.setDirectoryWrapper(new SeekTrackingDirectoryWrapper(seekTracker));
}
}
@Override
public List<RestHandler> getRestHandlers(
Settings settings,
NamedWriteableRegistry namedWriteableRegistry,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster,
Predicate<NodeFeature> clusterSupportsFeature
) {
if (enabled) {
return Collections.singletonList(new RestSeekStatsAction());
} else {
return Collections.emptyList();
}
}
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (enabled) {
return Collections.singletonList(new ActionHandler<>(SEEK_STATS_ACTION, TransportSeekStatsAction.class));
} else {
return Collections.emptyList();
}
}
}

View file

@ -1,269 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RandomAccessInput;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.IndexModule;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
public class SeekTrackingDirectoryWrapper implements IndexModule.DirectoryWrapper {
private final IndexSeekTracker seekTracker;
public SeekTrackingDirectoryWrapper(IndexSeekTracker seekTracker) {
this.seekTracker = seekTracker;
}
@Override
public Directory wrap(Directory directory, ShardRouting shardRouting) {
seekTracker.track(shardRouting.shardId().toString());
return new FilterDirectory(directory) {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
IndexInput input = super.openInput(name, context);
if (input instanceof RandomAccessInput) {
return new RandomAccessSeekCountingIndexInput(input, shardRouting.shardId().toString(), name);
}
return wrapIndexInput(shardRouting.shardId().toString(), name, input);
}
};
}
private IndexInput wrapIndexInput(String directory, String name, IndexInput in) {
return new SeekCountingIndexInput(in, directory, name);
}
class RandomAccessSeekCountingIndexInput extends SeekCountingIndexInput implements RandomAccessInput {
private final RandomAccessInput randomAccessInput;
RandomAccessSeekCountingIndexInput(IndexInput in, String directory, String name) {
super(in, directory, name);
randomAccessInput = (RandomAccessInput) unwrap(in);
}
@Override
public IndexInput clone() {
return new RandomAccessSeekCountingIndexInput(super.clone(), directory, name);
}
@Override
public byte readByte(long pos) throws IOException {
return randomAccessInput.readByte(pos);
}
@Override
public short readShort(long pos) throws IOException {
return randomAccessInput.readShort(pos);
}
@Override
public int readInt(long pos) throws IOException {
return randomAccessInput.readInt(pos);
}
@Override
public long readLong(long pos) throws IOException {
return randomAccessInput.readLong(pos);
}
}
class SeekCountingIndexInput extends IndexInput {
public static IndexInput unwrap(IndexInput input) {
while (input instanceof SeekCountingIndexInput) {
input = ((SeekCountingIndexInput) input).in;
}
return input;
}
final IndexInput in;
final String directory;
final String name;
SeekCountingIndexInput(IndexInput in, String directory, String name) {
super(unwrap(in).toString() + "[seek_tracked]");
this.in = unwrap(in);
this.directory = directory;
this.name = name;
}
@Override
public IndexInput clone() {
return new SeekCountingIndexInput(in.clone(), directory, name);
}
@Override
public void close() throws IOException {
in.close();
}
@Override
public long getFilePointer() {
return in.getFilePointer();
}
@Override
public void seek(long pos) throws IOException {
in.seek(pos);
seekTracker.increment(directory, name);
}
@Override
public long length() {
return in.length();
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
return wrapIndexInput(directory, name, in.slice(sliceDescription + "[seek_tracked]", offset, length));
}
@Override
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
final IndexInput innerSlice = in.slice("randomaccess", offset, length);
if (innerSlice instanceof RandomAccessInput) {
// slice() already supports random access
return new RandomAccessSeekCountingIndexInput(innerSlice, directory, name);
} else {
IndexInput slice = wrapIndexInput(directory, name, innerSlice);
// return default impl
return new RandomAccessInput() {
@Override
public long length() {
return slice.length();
}
@Override
public byte readByte(long pos) throws IOException {
slice.seek(pos);
return slice.readByte();
}
@Override
public short readShort(long pos) throws IOException {
slice.seek(pos);
return slice.readShort();
}
@Override
public int readInt(long pos) throws IOException {
slice.seek(pos);
return slice.readInt();
}
@Override
public long readLong(long pos) throws IOException {
slice.seek(pos);
return slice.readLong();
}
@Override
public String toString() {
return "RandomAccessInput(" + slice + ")";
}
};
}
}
@Override
public byte readByte() throws IOException {
return in.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readBytes(b, offset, len);
}
@Override
public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
in.readBytes(b, offset, len, useBuffer);
}
@Override
public short readShort() throws IOException {
return in.readShort();
}
@Override
public int readInt() throws IOException {
return in.readInt();
}
@Override
public int readVInt() throws IOException {
return in.readVInt();
}
@Override
public int readZInt() throws IOException {
return in.readZInt();
}
@Override
public long readLong() throws IOException {
return in.readLong();
}
@Override
public long readVLong() throws IOException {
return in.readVLong();
}
@Override
public long readZLong() throws IOException {
return in.readZLong();
}
@Override
public String readString() throws IOException {
return in.readString();
}
@Override
public Map<String, String> readMapOfStrings() throws IOException {
return in.readMapOfStrings();
}
@Override
public Set<String> readSetOfStrings() throws IOException {
return in.readSetOfStrings();
}
@Override
public void skipBytes(long numBytes) throws IOException {
in.skipBytes(numBytes);
}
@Override
public void readFloats(float[] floats, int offset, int len) throws IOException {
in.readFloats(floats, offset, len);
}
@Override
public void readLongs(long[] dst, int offset, int length) throws IOException {
in.readLongs(dst, offset, length);
}
@Override
public void readInts(int[] dst, int offset, int length) throws IOException {
in.readInts(dst, offset, length);
}
}
}

View file

@ -1,36 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
public record ShardSeekStats(String shard, Map<String, Long> seeksPerFile) implements Writeable, ToXContentObject {
public ShardSeekStats(StreamInput in) throws IOException {
this(in.readString(), in.readMap(StreamInput::readLong));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.shard);
out.writeMap(this.seeksPerFile, StreamOutput::writeLong);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("shard", this.shard).field("seeks", seeksPerFile).endObject();
}
}

View file

@ -1,83 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.seektracker;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TransportSeekStatsAction extends TransportNodesAction<SeekStatsRequest, SeekStatsResponse, SeekStatsRequest, NodeSeekStats> {
private final SeekStatsService seekStatsService;
@Inject
public TransportSeekStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
SeekStatsService seekStatsService
) {
super(
SeekTrackerPlugin.SEEK_STATS_ACTION.name(),
clusterService,
transportService,
actionFilters,
SeekStatsRequest::new,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
);
this.seekStatsService = seekStatsService;
}
@Override
protected SeekStatsResponse newResponse(SeekStatsRequest request, List<NodeSeekStats> seekStats, List<FailedNodeException> failures) {
return new SeekStatsResponse(clusterService.getClusterName(), seekStats, failures);
}
@Override
protected SeekStatsRequest newNodeRequest(SeekStatsRequest request) {
// TODO don't wrap the whole top-level request, it contains heavy and irrelevant DiscoveryNode things; see #100878
return request;
}
@Override
protected NodeSeekStats newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeSeekStats(in);
}
@Override
protected NodeSeekStats nodeOperation(SeekStatsRequest request, Task task) {
Map<String, List<ShardSeekStats>> seeks = new HashMap<>();
if (request.getIndices().length == 0) {
for (Map.Entry<String, IndexSeekTracker> entry : seekStatsService.getSeekStats().entrySet()) {
seeks.put(entry.getKey(), entry.getValue().getSeeks());
}
} else {
for (String index : request.getIndices()) {
IndexSeekTracker indexSeekTracker = seekStatsService.getSeekStats(index);
if (indexSeekTracker != null) {
seeks.put(index, indexSeekTracker.getSeeks());
}
}
}
return new NodeSeekStats(clusterService.localNode(), seeks);
}
}