Add node level cache stats for searchable snapshots (#71701)

This commit adds node-level statistics about the searchable 
snapshots shared cache that can be retrieved using the REST 
endpoint `GET /_searchable_snapshots/cache/stats`.

And the returned informations are:
{
  "nodes" : {
    "eerrtBMtQEisohZzxBLUSw" : {
      "shared_cache" : {
        "reads" : 6051,
        "bytes_read" : "5.1mb",
        "bytes_read_in_bytes" : 5448829,
        "writes" : 37,
        "bytes_written" : "1.1mb",
        "bytes_written_in_bytes" : 1208320,
        "evictions" : 5,
        "num_regions" : 32,
        "size" : "1mb",
        "size_in_bytes" : 1048576,
        "region_size" : "32kb",
        "region_size_in_bytes" : 32768
      }
    }
  }
}
This commit is contained in:
Tanguy Leroux 2021-04-19 17:02:27 +02:00 committed by GitHub
parent 6dbad503d9
commit ceaa16eddc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 760 additions and 20 deletions

View file

@ -0,0 +1,136 @@
[role="xpack"]
[testenv="enterprise"]
[[searchable-snapshots-api-cache-stats]]
=== Cache stats API
++++
<titleabbrev>Cache stats</titleabbrev>
++++
Provide statistics about the searchable snapshots <<shared-cache,shared cache>>.
[[searchable-snapshots-api-cache-stats-request]]
==== {api-request-title}
`GET /_searchable_snapshots/cache/stats` +
`GET /_searchable_snapshots/<node_id>/cache/stats`
[[searchable-snapshots-api-cache-stats-prereqs]]
==== {api-prereq-title}
If the {es} {security-features} are enabled, you must have the
`manage` cluster privilege to use this API.
For more information, see <<security-privileges>>.
[[searchable-snapshots-api-cache-stats-desc]]
==== {api-description-title}
You can use the Cache Stats API to retrieve statistics about the
usage of the <<shared-cache,shared cache>> on nodes in a cluster.
[[searchable-snapshots-api-cache-stats-path-params]]
==== {api-path-parms-title}
`<node_id>`::
(Optional, string) The names of particular nodes in the cluster to target.
For example, `nodeId1,nodeId2`. For node selection options, see
<<cluster-nodes>>.
[[searchable-snapshots-api-cache-stats-query-params]]
==== {api-query-parms-title}
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
[role="child_attributes"]
[[searchable-snapshots-api-cache-stats-response-body]]
==== {api-response-body-title}
`nodes`::
(object)
Contains statistics for the nodes selected by the request.
+
.Properties of `nodes`
[%collapsible%open]
====
`<node_id>`::
(object)
Contains statistics for the node with the given identifier.
+
.Properties of `<node_id>`
[%collapsible%open]
=====
`shared_cache`::
(object)
Contains statistics about the shared cache file.
+
.Properties of `shared_cache`
[%collapsible%open]
======
`reads`::
(long) Number of times the shared cache is used to read data from.
`bytes_read_in_bytes`::
(long) The total of bytes read from the shared cache.
`writes`::
(long) Number of times data from the blob store repository is written in the shared cache.
`bytes_written_in_bytes`::
(long) The total of bytes written in the shared cache.
`evictions`::
(long) Number of regions evicted from the shared cache file.
`num_regions`::
(integer) Number of regions in the shared cache file.
`size_in_bytes`::
(long) The total size in bytes of the shared cache file.
`region_size_in_bytes`::
(long) The size in bytes of a region in the shared cache file.
======
=====
====
[[searchable-snapshots-api-cache-stats-example]]
==== {api-examples-title}
Retrieves the searchable snapshots shared cache file statistics for all data nodes:
[source,console]
--------------------------------------------------
GET /_searchable_snapshots/cache/stats
--------------------------------------------------
// TEST[setup:node]
The API returns the following response:
[source,console-result]
----
{
"nodes" : {
"eerrtBMtQEisohZzxBLUSw" : {
"shared_cache" : {
"reads" : 6051,
"bytes_read_in_bytes" : 5448829,
"writes" : 37,
"bytes_written_in_bytes" : 1208320,
"evictions" : 5,
"num_regions" : 65536,
"size_in_bytes" : 1099511627776,
"region_size_in_bytes" : 16777216
}
}
}
}
----
// TESTRESPONSE[s/"reads" : 6051/"reads" : 0/]
// TESTRESPONSE[s/"bytes_read_in_bytes" : 5448829/"bytes_read_in_bytes" : 0/]
// TESTRESPONSE[s/"writes" : 37/"writes" : 0/]
// TESTRESPONSE[s/"bytes_written_in_bytes" : 1208320/"bytes_written_in_bytes" : 0/]
// TESTRESPONSE[s/"evictions" : 5/"evictions" : 0/]
// TESTRESPONSE[s/"num_regions" : 65536/"num_regions" : 0/]
// TESTRESPONSE[s/"size_in_bytes" : 1099511627776/"size_in_bytes" : 0/]
// TESTRESPONSE[s/"eerrtBMtQEisohZzxBLUSw"/\$node_name/]

View file

@ -6,5 +6,7 @@
You can use the following APIs to perform searchable snapshots operations. You can use the following APIs to perform searchable snapshots operations.
* <<searchable-snapshots-api-mount-snapshot,Mount snapshot>> * <<searchable-snapshots-api-mount-snapshot,Mount snapshot>>
* <<searchable-snapshots-api-cache-stats,Cache statistics>>
include::mount-snapshot.asciidoc[] include::mount-snapshot.asciidoc[]
include::node-cache-stats.asciidoc[]

View file

@ -0,0 +1,35 @@
{
"searchable_snapshots.cache_stats": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/searchable-snapshots-apis.html",
"description": "Retrieve node-level cache statistics about searchable snapshots."
},
"stability": "experimental",
"visibility":"public",
"headers":{
"accept": [ "application/json"]
},
"url": {
"paths": [
{
"path": "/_searchable_snapshots/cache/stats",
"methods": [
"GET"
]
},
{
"path": "/_searchable_snapshots/{node_id}/cache/stats",
"methods": [
"GET"
],
"parts":{
"node_id":{
"type":"list",
"description":"A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes"
}
}
}
]
}
}
}

View file

@ -10,7 +10,7 @@ final File repoDir = file("$buildDir/testclusters/repo")
restResources { restResources {
restApi { restApi {
include 'indices', 'search', 'bulk', 'snapshot', 'nodes', '_common', 'searchable_snapshots' include 'indices', 'search', 'bulk', 'snapshot', 'nodes', '_common', 'searchable_snapshots', 'cluster'
} }
} }

View file

@ -0,0 +1,108 @@
---
setup:
- skip:
version: " - 7.99.99"
reason: node-level cache statistics added in 8.0.0
- do:
indices.create:
index: docs
body:
settings:
number_of_shards: 1
number_of_replicas: 0
- do:
bulk:
body:
- index:
_index: docs
_id: 1
- field: foo
- index:
_index: docs
_id: 2
- field: bar
- index:
_index: docs
_id: 3
- field: baz
- do:
snapshot.create_repository:
repository: repository-fs
body:
type: fs
settings:
location: "repository-fs"
# Remove the snapshot if a previous test failed to delete it.
# Useful for third party tests that runs the test against a real external service.
- do:
snapshot.delete:
repository: repository-fs
snapshot: snapshot
ignore: 404
- do:
snapshot.create:
repository: repository-fs
snapshot: snapshot
wait_for_completion: true
- do:
indices.delete:
index: docs
---
teardown:
- do:
snapshot.delete:
repository: repository-fs
snapshot: snapshot
ignore: 404
- do:
snapshot.delete_repository:
repository: repository-fs
---
"Node Cache Stats API with Frozen Indices":
- do:
cluster.state: {}
- set: { master_node: node_id }
- do:
searchable_snapshots.mount:
repository: repository-fs
snapshot: snapshot
wait_for_completion: true
storage: shared_cache
body:
index: docs
renamed_index: frozen-docs
- match: { snapshot.snapshot: snapshot }
- match: { snapshot.shards.failed: 0 }
- match: { snapshot.shards.successful: 1 }
- do:
searchable_snapshots.cache_stats:
human: true
- is_true: nodes.$node_id
- is_true: nodes.$node_id.shared_cache
- match: { nodes.$node_id.shared_cache.reads: 0 }
- match: { nodes.$node_id.shared_cache.bytes_read: "0b" }
- match: { nodes.$node_id.shared_cache.bytes_read_in_bytes: 0 }
- match: { nodes.$node_id.shared_cache.writes: 0 }
- match: { nodes.$node_id.shared_cache.bytes_written: "0b" }
- match: { nodes.$node_id.shared_cache.bytes_written_in_bytes: 0 }
- match: { nodes.$node_id.shared_cache.evictions: 0 }
- match: { nodes.$node_id.shared_cache.num_regions: 64 }
- match: { nodes.$node_id.shared_cache.size: "16mb" }
- match: { nodes.$node_id.shared_cache.size_in_bytes: 16777216 }
- match: { nodes.$node_id.shared_cache.region_size: "256kb" }
- match: { nodes.$node_id.shared_cache.region_size_in_bytes: 262144 }

View file

@ -12,6 +12,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.xpack.searchablesnapshots.action.cache.TransportSearchableSnapshotsNodeCachesStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider; import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider;
import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -43,6 +44,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.FrozenEngine; import org.elasticsearch.index.engine.FrozenEngine;
import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsNodeCachesStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory; import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
@ -343,6 +345,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
PersistentCache.cleanUp(settings, nodeEnvironment); PersistentCache.cleanUp(settings, nodeEnvironment);
} }
this.allocator.set(new SearchableSnapshotAllocator(client, clusterService.getRerouteService(), frozenCacheInfoService)); this.allocator.set(new SearchableSnapshotAllocator(client, clusterService.getRerouteService(), frozenCacheInfoService));
components.add(new FrozenCacheServiceSupplier(frozenCacheService.get()));
components.add(new CacheServiceSupplier(cacheService.get())); components.add(new CacheServiceSupplier(cacheService.get()));
return Collections.unmodifiableList(components); return Collections.unmodifiableList(components);
} }
@ -479,7 +482,11 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
new ActionHandler<>(XPackInfoFeatureAction.SEARCHABLE_SNAPSHOTS, SearchableSnapshotsInfoTransportAction.class), new ActionHandler<>(XPackInfoFeatureAction.SEARCHABLE_SNAPSHOTS, SearchableSnapshotsInfoTransportAction.class),
new ActionHandler<>(TransportSearchableSnapshotCacheStoresAction.TYPE, TransportSearchableSnapshotCacheStoresAction.class), new ActionHandler<>(TransportSearchableSnapshotCacheStoresAction.TYPE, TransportSearchableSnapshotCacheStoresAction.class),
new ActionHandler<>(FrozenCacheInfoAction.INSTANCE, FrozenCacheInfoAction.TransportAction.class), new ActionHandler<>(FrozenCacheInfoAction.INSTANCE, FrozenCacheInfoAction.TransportAction.class),
new ActionHandler<>(FrozenCacheInfoNodeAction.INSTANCE, FrozenCacheInfoNodeAction.TransportAction.class) new ActionHandler<>(FrozenCacheInfoNodeAction.INSTANCE, FrozenCacheInfoNodeAction.TransportAction.class),
new ActionHandler<>(
TransportSearchableSnapshotsNodeCachesStatsAction.TYPE,
TransportSearchableSnapshotsNodeCachesStatsAction.class
)
); );
} }
@ -495,7 +502,8 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
return List.of( return List.of(
new RestSearchableSnapshotsStatsAction(), new RestSearchableSnapshotsStatsAction(),
new RestClearSearchableSnapshotsCacheAction(), new RestClearSearchableSnapshotsCacheAction(),
new RestMountSearchableSnapshotAction() new RestMountSearchableSnapshotAction(),
new RestSearchableSnapshotsNodeCachesStatsAction()
); );
} }
@ -656,6 +664,9 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
Releasables.close(frozenCacheService.get()); Releasables.close(frozenCacheService.get());
} }
/**
* Allows to inject the {@link CacheService} instance to transport actions
*/
public static final class CacheServiceSupplier implements Supplier<CacheService> { public static final class CacheServiceSupplier implements Supplier<CacheService> {
@Nullable @Nullable
@ -670,4 +681,22 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
return cacheService; return cacheService;
} }
} }
/**
* Allows to inject the {@link FrozenCacheService} instance to transport actions
*/
public static final class FrozenCacheServiceSupplier implements Supplier<FrozenCacheService> {
@Nullable
private final FrozenCacheService frozenCacheService;
FrozenCacheServiceSupplier(@Nullable FrozenCacheService frozenCacheService) {
this.frozenCacheService = frozenCacheService;
}
@Override
public FrozenCacheService get() {
return frozenCacheService;
}
}
} }

View file

@ -0,0 +1,281 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.searchablesnapshots.action.cache;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Node level stats about searchable snapshots caches.
*/
public class TransportSearchableSnapshotsNodeCachesStatsAction extends TransportNodesAction<
TransportSearchableSnapshotsNodeCachesStatsAction.NodesRequest,
TransportSearchableSnapshotsNodeCachesStatsAction.NodesCachesStatsResponse,
TransportSearchableSnapshotsNodeCachesStatsAction.NodeRequest,
TransportSearchableSnapshotsNodeCachesStatsAction.NodeCachesStatsResponse> {
public static final String ACTION_NAME = "cluster:admin/xpack/searchable_snapshots/cache/stats";
public static final ActionType<NodesCachesStatsResponse> TYPE = new ActionType<>(ACTION_NAME, NodesCachesStatsResponse::new);
private final Supplier<FrozenCacheService> frozenCacheService;
@Inject
public TransportSearchableSnapshotsNodeCachesStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
SearchableSnapshots.FrozenCacheServiceSupplier frozenCacheService
) {
super(
ACTION_NAME,
threadPool,
clusterService,
transportService,
actionFilters,
NodesRequest::new,
NodeRequest::new,
ThreadPool.Names.MANAGEMENT,
ThreadPool.Names.SAME,
NodeCachesStatsResponse.class
);
this.frozenCacheService = frozenCacheService;
}
@Override
protected NodesCachesStatsResponse newResponse(
NodesRequest request,
List<NodeCachesStatsResponse> responses,
List<FailedNodeException> failures
) {
return new NodesCachesStatsResponse(clusterService.getClusterName(), responses, failures);
}
@Override
protected NodeRequest newNodeRequest(NodesRequest request) {
return new NodeRequest();
}
@Override
protected NodeCachesStatsResponse newNodeResponse(StreamInput in) throws IOException {
return new NodeCachesStatsResponse(in);
}
@Override
protected void resolveRequest(NodesRequest request, ClusterState clusterState) {
final ImmutableOpenMap<String, DiscoveryNode> dataNodes = clusterState.getNodes().getDataNodes();
final DiscoveryNode[] resolvedNodes;
if (request.nodesIds() == null || request.nodesIds().length == 0) {
resolvedNodes = dataNodes.values().toArray(DiscoveryNode.class);
} else {
resolvedNodes = Arrays.stream(request.nodesIds())
.filter(dataNodes::containsKey)
.map(dataNodes::get)
.collect(Collectors.toList())
.toArray(DiscoveryNode[]::new);
}
request.setConcreteNodes(resolvedNodes);
}
@Override
protected NodeCachesStatsResponse nodeOperation(NodeRequest request, Task task) {
final FrozenCacheService.Stats frozenCacheStats;
if (frozenCacheService.get() != null) {
frozenCacheStats = frozenCacheService.get().getStats();
} else {
frozenCacheStats = FrozenCacheService.Stats.EMPTY;
}
return new NodeCachesStatsResponse(
clusterService.localNode(),
frozenCacheStats.getNumberOfRegions(),
frozenCacheStats.getSize(),
frozenCacheStats.getRegionSize(),
frozenCacheStats.getWriteCount(),
frozenCacheStats.getWriteBytes(),
frozenCacheStats.getReadCount(),
frozenCacheStats.getReadBytes(),
frozenCacheStats.getEvictCount()
);
}
public static final class NodeRequest extends TransportRequest {
public NodeRequest() {}
public NodeRequest(StreamInput in) throws IOException {
super(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static final class NodesRequest extends BaseNodesRequest<NodesRequest> {
public NodesRequest(String[] nodes) {
super(nodes);
}
public NodesRequest(StreamInput in) throws IOException {
super(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static class NodeCachesStatsResponse extends BaseNodeResponse implements ToXContentFragment {
private final int numRegions;
private final long size;
private final long regionSize;
private final long writes;
private final long bytesWritten;
private final long reads;
private final long bytesRead;
private final long evictions;
public NodeCachesStatsResponse(
DiscoveryNode node,
int numRegions,
long size,
long regionSize,
long writes,
long bytesWritten,
long reads,
long bytesRead,
long evictions
) {
super(node);
this.numRegions = numRegions;
this.size = size;
this.regionSize = regionSize;
this.writes = writes;
this.bytesWritten = bytesWritten;
this.reads = reads;
this.bytesRead = bytesRead;
this.evictions = evictions;
}
public NodeCachesStatsResponse(StreamInput in) throws IOException {
super(in);
this.numRegions = in.readVInt();
this.size = in.readVLong();
this.regionSize = in.readVLong();
this.writes = in.readVLong();
this.bytesWritten = in.readVLong();
this.reads = in.readVLong();
this.bytesRead = in.readVLong();
this.evictions = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(numRegions);
out.writeVLong(size);
out.writeVLong(regionSize);
out.writeVLong(writes);
out.writeVLong(bytesWritten);
out.writeVLong(reads);
out.writeVLong(bytesRead);
out.writeVLong(evictions);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(getNode().getId());
{
builder.startObject("shared_cache");
{
builder.field("reads", reads);
builder.humanReadableField("bytes_read_in_bytes", "bytes_read", ByteSizeValue.ofBytes(bytesRead));
builder.field("writes", writes);
builder.humanReadableField("bytes_written_in_bytes", "bytes_written", ByteSizeValue.ofBytes(bytesWritten));
builder.field("evictions", evictions);
builder.field("num_regions", numRegions);
builder.humanReadableField("size_in_bytes", "size", ByteSizeValue.ofBytes(size));
builder.humanReadableField("region_size_in_bytes", "region_size", ByteSizeValue.ofBytes(regionSize));
}
builder.endObject();
}
builder.endObject();
return builder;
}
}
public static class NodesCachesStatsResponse extends BaseNodesResponse<NodeCachesStatsResponse> implements ToXContentObject {
public NodesCachesStatsResponse(StreamInput in) throws IOException {
super(in);
}
public NodesCachesStatsResponse(ClusterName clusterName, List<NodeCachesStatsResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
protected List<NodeCachesStatsResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeCachesStatsResponse::new);
}
@Override
protected void writeNodesTo(StreamOutput out, List<NodeCachesStatsResponse> nodes) throws IOException {
out.writeList(nodes);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.startObject("nodes");
for (NodeCachesStatsResponse node : getNodes()) {
node.toXContent(builder, params);
}
builder.endObject();
}
builder.endObject();
return builder;
}
}
}

View file

@ -781,5 +781,4 @@ public class CacheService extends AbstractLifecycleComponent {
return "cache file event [type=" + type + ", value=" + value + ']'; return "cache file event [type=" + type + ", value=" + value + ']';
} }
} }
} }

View file

@ -49,11 +49,11 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.CacheFile;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.CacheKey;
import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.common.ByteRange;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.CacheFile;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.CacheKey;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;

View file

@ -28,12 +28,12 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.CacheKey;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.SparseFileTracker;
import org.elasticsearch.node.NodeRoleSettings; import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.DataTier; import org.elasticsearch.xpack.core.DataTier;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.common.ByteRange;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.CacheKey;
import org.elasticsearch.xpack.searchablesnapshots.cache.common.SparseFileTracker;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
@ -48,6 +48,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -169,10 +170,12 @@ public class FrozenCacheService implements Releasable {
private final KeyedLock<CacheKey> keyedLock = new KeyedLock<>(); private final KeyedLock<CacheKey> keyedLock = new KeyedLock<>();
private final SharedBytes sharedBytes; private final SharedBytes sharedBytes;
private final long cacheSize;
private final long regionSize; private final long regionSize;
private final ByteSizeValue rangeSize; private final ByteSizeValue rangeSize;
private final ByteSizeValue recoveryRangeSize; private final ByteSizeValue recoveryRangeSize;
private final int numRegions;
private final ConcurrentLinkedQueue<Integer> freeRegions = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Integer> freeRegions = new ConcurrentLinkedQueue<>();
private final Entry<CacheFileRegion>[] freqs; private final Entry<CacheFileRegion>[] freqs;
private final int maxFreq; private final int maxFreq;
@ -182,12 +185,20 @@ public class FrozenCacheService implements Releasable {
private final CacheDecayTask decayTask; private final CacheDecayTask decayTask;
private final LongAdder writeCount = new LongAdder();
private final LongAdder writeBytes = new LongAdder();
private final LongAdder readCount = new LongAdder();
private final LongAdder readBytes = new LongAdder();
private final LongAdder evictCount = new LongAdder();
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public FrozenCacheService(NodeEnvironment environment, Settings settings, ThreadPool threadPool) { public FrozenCacheService(NodeEnvironment environment, Settings settings, ThreadPool threadPool) {
this.currentTimeSupplier = threadPool::relativeTimeInMillis; this.currentTimeSupplier = threadPool::relativeTimeInMillis;
final long cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings).getBytes(); this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings).getBytes();
final long regionSize = SNAPSHOT_CACHE_REGION_SIZE_SETTING.get(settings).getBytes(); final long regionSize = SNAPSHOT_CACHE_REGION_SIZE_SETTING.get(settings).getBytes();
final int numRegions = Math.toIntExact(cacheSize / regionSize); this.numRegions = Math.toIntExact(cacheSize / regionSize);
keyMapping = new ConcurrentHashMap<>(); keyMapping = new ConcurrentHashMap<>();
if (Assertions.ENABLED) { if (Assertions.ENABLED) {
regionOwners = new AtomicReference[numRegions]; regionOwners = new AtomicReference[numRegions];
@ -206,7 +217,7 @@ public class FrozenCacheService implements Releasable {
this.minTimeDelta = SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING.get(settings).millis(); this.minTimeDelta = SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING.get(settings).millis();
freqs = new Entry[maxFreq]; freqs = new Entry[maxFreq];
try { try {
sharedBytes = new SharedBytes(numRegions, regionSize, environment); sharedBytes = new SharedBytes(numRegions, regionSize, environment, writeBytes::add, readBytes::add);
} catch (IOException e) { } catch (IOException e) {
throw new UncheckedIOException(e); throw new UncheckedIOException(e);
} }
@ -345,6 +356,19 @@ public class FrozenCacheService implements Releasable {
return freeRegions.size(); return freeRegions.size();
} }
public Stats getStats() {
return new Stats(
numRegions,
cacheSize,
regionSize,
evictCount.sum(),
writeCount.sum(),
writeBytes.sum(),
readCount.sum(),
readBytes.sum()
);
}
private synchronized boolean invariant(final Entry<CacheFileRegion> e, boolean present) { private synchronized boolean invariant(final Entry<CacheFileRegion> e, boolean present) {
boolean found = false; boolean found = false;
for (int i = 0; i < maxFreq; i++) { for (int i = 0; i < maxFreq; i++) {
@ -595,6 +619,7 @@ public class FrozenCacheService implements Releasable {
public boolean tryEvict() { public boolean tryEvict() {
if (refCount() <= 1 && evicted.compareAndSet(false, true)) { if (refCount() <= 1 && evicted.compareAndSet(false, true)) {
logger.trace("evicted {} with channel offset {}", regionKey, physicalStartOffset()); logger.trace("evicted {} with channel offset {}", regionKey, physicalStartOffset());
evictCount.increment();
decRef(); decRef();
return true; return true;
} }
@ -604,6 +629,7 @@ public class FrozenCacheService implements Releasable {
public boolean forceEvict() { public boolean forceEvict() {
if (evicted.compareAndSet(false, true)) { if (evicted.compareAndSet(false, true)) {
logger.trace("force evicted {} with channel offset {}", regionKey, physicalStartOffset()); logger.trace("force evicted {} with channel offset {}", regionKey, physicalStartOffset());
evictCount.increment();
decRef(); decRef();
return true; return true;
} }
@ -614,10 +640,6 @@ public class FrozenCacheService implements Releasable {
return evicted.get(); return evicted.get();
} }
public boolean isReleased() {
return isEvicted() && refCount() == 0;
}
@Override @Override
protected void closeInternal() { protected void closeInternal() {
// now actually free the region associated with this chunk // now actually free the region associated with this chunk
@ -676,6 +698,7 @@ public class FrozenCacheService implements Releasable {
gap.end() - gap.start(), gap.end() - gap.start(),
progress -> gap.onProgress(start + progress) progress -> gap.onProgress(start + progress)
); );
writeCount.increment();
} finally { } finally {
decRef(); decRef();
} }
@ -717,6 +740,7 @@ public class FrozenCacheService implements Releasable {
+ '-' + '-'
+ rangeToRead.start() + rangeToRead.start()
+ ']'; + ']';
readCount.increment();
listener.onResponse(read); listener.onResponse(read);
}, listener::onFailure); }, listener::onFailure);
} }
@ -827,4 +851,70 @@ public class FrozenCacheService implements Releasable {
void fillCacheRange(SharedBytes.IO channel, long channelPos, long relativePos, long length, Consumer<Long> progressUpdater) void fillCacheRange(SharedBytes.IO channel, long channelPos, long relativePos, long length, Consumer<Long> progressUpdater)
throws IOException; throws IOException;
} }
public static class Stats {
public static final Stats EMPTY = new Stats(0, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
private final int numberOfRegions;
private final long size;
private final long regionSize;
private final long evictCount;
private final long writeCount;
private final long writeBytes;
private final long readCount;
private final long readBytes;
private Stats(
int numberOfRegions,
long size,
long regionSize,
long evictCount,
long writeCount,
long writeBytes,
long readCount,
long readBytes
) {
this.numberOfRegions = numberOfRegions;
this.size = size;
this.regionSize = regionSize;
this.evictCount = evictCount;
this.writeCount = writeCount;
this.writeBytes = writeBytes;
this.readCount = readCount;
this.readBytes = readBytes;
}
public int getNumberOfRegions() {
return numberOfRegions;
}
public long getSize() {
return size;
}
public long getRegionSize() {
return regionSize;
}
public long getEvictCount() {
return evictCount;
}
public long getWriteCount() {
return writeCount;
}
public long getWriteBytes() {
return writeBytes;
}
public long getReadCount() {
return readCount;
}
public long getReadBytes() {
return readBytes;
}
}
} }

View file

@ -27,6 +27,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.Map; import java.util.Map;
import java.util.function.IntConsumer;
public class SharedBytes extends AbstractRefCounted { public class SharedBytes extends AbstractRefCounted {
@ -47,10 +48,13 @@ public class SharedBytes extends AbstractRefCounted {
// TODO: for systems like Windows without true p-write/read support we should split this up into multiple channels since positional // TODO: for systems like Windows without true p-write/read support we should split this up into multiple channels since positional
// operations in #IO are not contention-free there (https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6265734) // operations in #IO are not contention-free there (https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6265734)
private final FileChannel fileChannel; private final FileChannel fileChannel;
private final Path path; private final Path path;
SharedBytes(int numRegions, long regionSize, NodeEnvironment environment) throws IOException { private final IntConsumer writeBytes;
private final IntConsumer readBytes;
SharedBytes(int numRegions, long regionSize, NodeEnvironment environment, IntConsumer writeBytes, IntConsumer readBytes)
throws IOException {
super("shared-bytes"); super("shared-bytes");
this.numRegions = numRegions; this.numRegions = numRegions;
this.regionSize = regionSize; this.regionSize = regionSize;
@ -90,6 +94,8 @@ public class SharedBytes extends AbstractRefCounted {
} }
} }
this.path = cacheFile; this.path = cacheFile;
this.writeBytes = writeBytes;
this.readBytes = readBytes;
} }
/** /**
@ -169,7 +175,9 @@ public class SharedBytes extends AbstractRefCounted {
@SuppressForbidden(reason = "Use positional reads on purpose") @SuppressForbidden(reason = "Use positional reads on purpose")
public int read(ByteBuffer dst, long position) throws IOException { public int read(ByteBuffer dst, long position) throws IOException {
checkOffsets(position, dst.remaining()); checkOffsets(position, dst.remaining());
return fileChannel.read(dst, position); final int bytesRead = fileChannel.read(dst, position);
readBytes.accept(bytesRead);
return bytesRead;
} }
@SuppressForbidden(reason = "Use positional writes on purpose") @SuppressForbidden(reason = "Use positional writes on purpose")
@ -178,7 +186,9 @@ public class SharedBytes extends AbstractRefCounted {
assert position % PAGE_SIZE == 0; assert position % PAGE_SIZE == 0;
assert src.remaining() % PAGE_SIZE == 0; assert src.remaining() % PAGE_SIZE == 0;
checkOffsets(position, src.remaining()); checkOffsets(position, src.remaining());
return fileChannel.write(src, position); final int bytesWritten = fileChannel.write(src, position);
writeBytes.accept(bytesWritten);
return bytesWritten;
} }
private void checkOffsets(long position, long length) { private void checkOffsets(long position, long length) {

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.searchablesnapshots.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.searchablesnapshots.action.cache.TransportSearchableSnapshotsNodeCachesStatsAction;
import java.util.List;
import static org.elasticsearch.rest.RestRequest.Method.GET;
/**
* Node level stats for searchable snapshots caches.
*/
public class RestSearchableSnapshotsNodeCachesStatsAction extends BaseRestHandler {
@Override
public List<RestHandler.Route> routes() {
return List.of(
new RestHandler.Route(GET, "/_searchable_snapshots/cache/stats"),
new RestHandler.Route(GET, "/_searchable_snapshots/{nodeId}/cache/stats")
);
}
@Override
public String getName() {
return "searchable_snapshots_cache_stats_action";
}
@Override
public BaseRestHandler.RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
final String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
return channel -> client.execute(
TransportSearchableSnapshotsNodeCachesStatsAction.TYPE,
new TransportSearchableSnapshotsNodeCachesStatsAction.NodesRequest(nodesIds),
new RestToXContentListener<>(channel)
);
}
}

View file

@ -159,7 +159,7 @@ public class FrozenIndexInput extends MetadataCachingIndexInput {
final long streamStartPosition = rangeToWrite.start() + relativePos; final long streamStartPosition = rangeToWrite.start() + relativePos;
try (InputStream input = openInputStreamFromBlobStore(streamStartPosition, len)) { try (InputStream input = openInputStreamFromBlobStore(streamStartPosition, len)) {
this.writeCacheFile(channel, input, channelPos, relativePos, len, progressUpdater, startTimeNanos); writeCacheFile(channel, input, channelPos, relativePos, len, progressUpdater, startTimeNanos);
} }
}, },
directory.cacheFetchAsyncExecutor() directory.cacheFetchAsyncExecutor()

View file

@ -164,6 +164,7 @@ public class Constants {
"cluster:admin/xpack/rollup/start", "cluster:admin/xpack/rollup/start",
"cluster:admin/xpack/rollup/stop", "cluster:admin/xpack/rollup/stop",
"cluster:admin/xpack/searchable_snapshots/cache/clear", "cluster:admin/xpack/searchable_snapshots/cache/clear",
"cluster:admin/xpack/searchable_snapshots/cache/stats",
"cluster:admin/xpack/security/api_key/create", "cluster:admin/xpack/security/api_key/create",
"cluster:admin/xpack/security/api_key/get", "cluster:admin/xpack/security/api_key/get",
"cluster:admin/xpack/security/api_key/grant", "cluster:admin/xpack/security/api_key/grant",