Introduced NodeEvent and RoutingEvent. Cleaned up naming as we don't only export stats any more. Cluster Pulse is now showing node, index and routing events (shard + cluster still shipping).

This commit is contained in:
Boaz Leskes 2013-11-27 22:50:39 +01:00
parent 99ddd4101a
commit 6acc3258e1
7 changed files with 294 additions and 86 deletions

View file

@ -4,8 +4,8 @@
"query": {
"list": {
"0": {
"query": "_type:cluster_event",
"alias": "Cluster events",
"query": "_type:node_event",
"alias": "Node events",
"color": "#7EB26D",
"id": 0,
"pin": true,
@ -15,20 +15,20 @@
"1": {
"id": 1,
"color": "#EAB839",
"alias": "Index events",
"alias": "Index metadata events",
"pin": true,
"type": "lucene",
"enable": true,
"query": "_type:index_event"
"query": "_type:index_metadata_event"
},
"2": {
"id": 2,
"color": "#6ED0E0",
"alias": "Shard events",
"alias": "Routing events",
"pin": true,
"type": "lucene",
"enable": true,
"query": "_type:shard_event"
"query": "_type:routing_event"
}
},
"ids": [
@ -117,7 +117,7 @@
"panels": [
{
"error": false,
"span": 10,
"span": 9,
"editable": true,
"type": "table",
"loadingEditor": false,
@ -160,7 +160,7 @@
},
{
"error": false,
"span": 2,
"span": 3,
"editable": true,
"type": "terms",
"loadingEditor": false,

View file

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableSet;
@ -46,10 +47,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesService;
import org.elasticsearch.marvel.monitor.event.Event;
import org.elasticsearch.marvel.monitor.event.ClusterEvent;
import org.elasticsearch.marvel.monitor.event.IndexEvent;
import org.elasticsearch.marvel.monitor.event.ShardEvent;
import org.elasticsearch.marvel.monitor.event.*;
import org.elasticsearch.marvel.monitor.exporter.ESExporter;
import org.elasticsearch.marvel.monitor.exporter.StatsExporter;
import org.elasticsearch.node.service.NodeService;
@ -61,7 +59,7 @@ import java.util.concurrent.BlockingQueue;
import static org.elasticsearch.common.collect.Lists.newArrayList;
public class StatsExportersService extends AbstractLifecycleComponent<StatsExportersService> {
public class ExportersService extends AbstractLifecycleComponent<ExportersService> {
private final InternalIndicesService indicesService;
private final NodeService nodeService;
@ -82,10 +80,10 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
private final BlockingQueue<Event> pendingEventsQueue;
@Inject
public StatsExportersService(Settings settings, IndicesService indicesService,
NodeService nodeService, ClusterService clusterService,
Client client,
Discovery discovery) {
public ExportersService(Settings settings, IndicesService indicesService,
NodeService nodeService, ClusterService clusterService,
Client client,
Discovery discovery) {
super(settings);
this.indicesService = (InternalIndicesService) indicesService;
this.clusterService = clusterService;
@ -259,15 +257,53 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
// only collect if i'm master.
long timestamp = System.currentTimeMillis();
if (!event.previousState().nodes().localNodeMaster()) {
pendingEventsQueue.add(new ClusterEvent.ElectedAsMaster(timestamp, event.state().nodes().localNode(), event.source()));
pendingEventsQueue.add(new NodeEvent.ElectedAsMaster(timestamp, event.state().nodes().localNode(), event.source()));
}
for (DiscoveryNode node : event.nodesDelta().addedNodes()) {
pendingEventsQueue.add(new ClusterEvent.NodeJoinLeave(timestamp, node, true, event.source()));
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, node, true, event.source()));
}
for (DiscoveryNode node : event.nodesDelta().removedNodes()) {
pendingEventsQueue.add(new ClusterEvent.NodeJoinLeave(timestamp, node, false, event.source()));
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, node, false, event.source()));
}
if (event.routingTableChanged()) {
// hunt for initializing shards
RoutingNodes previousRoutingNodes = event.previousState().routingNodes();
for (ShardRouting shardRouting : event.state().routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)) {
RoutingNode oldRoutingNode = previousRoutingNodes.node(shardRouting.currentNodeId());
boolean changed = true;
if (oldRoutingNode != null) {
for (ShardRouting oldShardRouting : oldRoutingNode.shards()) {
if (oldShardRouting.equals(shardRouting)) {
changed = false;
break;
}
}
}
if (!changed) {
continue; // no event.
}
if (shardRouting.relocatingNodeId() != null) {
// if relocating node is not null, this shard is initializing due to a relocation
ShardRouting tmpShardRouting = new MutableShardRouting(
shardRouting.index(), shardRouting.id(), shardRouting.relocatingNodeId(),
shardRouting.currentNodeId(), shardRouting.primary(),
ShardRoutingState.RELOCATING, shardRouting.version());
pendingEventsQueue.add(new RoutingEvent.ShardRelocating(timestamp, tmpShardRouting,
clusterService.state().nodes().get(tmpShardRouting.relocatingNodeId()),
clusterService.state().nodes().get(tmpShardRouting.currentNodeId())
));
} else {
pendingEventsQueue.add(new RoutingEvent.ShardInitializing(timestamp, shardRouting,
clusterService.state().nodes().get(shardRouting.currentNodeId())
));
}
}
}
if (event.blocksChanged()) {
@ -298,11 +334,11 @@ public class StatsExportersService extends AbstractLifecycleComponent<StatsExpor
}
for (String index : event.indicesCreated()) {
pendingEventsQueue.add(new IndexEvent.IndexCreateDelete(timestamp, index, true, event.source()));
pendingEventsQueue.add(new IndexMetaDataEvent.IndexCreateDelete(timestamp, index, true, event.source()));
}
for (String index : event.indicesDeleted()) {
pendingEventsQueue.add(new IndexEvent.IndexCreateDelete(timestamp, index, false, event.source()));
pendingEventsQueue.add(new IndexMetaDataEvent.IndexCreateDelete(timestamp, index, false, event.source()));
}
}

View file

@ -50,7 +50,7 @@ public class Plugin extends AbstractPlugin {
@Override
protected void configure() {
bind(StatsExportersService.class).asEagerSingleton();
bind(ExportersService.class).asEagerSingleton();
}
};
return ImmutableList.of(m);
@ -59,7 +59,7 @@ public class Plugin extends AbstractPlugin {
@Override
public Collection<Class<? extends LifecycleComponent>> services() {
Collection<Class<? extends LifecycleComponent>> l = new ArrayList<Class<? extends LifecycleComponent>>();
l.add(StatsExportersService.class);
l.add(ExportersService.class);
return l;
}
}

View file

@ -19,10 +19,8 @@ package org.elasticsearch.marvel.monitor.event;
*/
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.marvel.monitor.Utils;
import java.io.IOException;
@ -50,60 +48,6 @@ public abstract class ClusterEvent extends Event {
return builder;
}
public static class ElectedAsMaster extends ClusterEvent {
private final DiscoveryNode node;
public ElectedAsMaster(long timestamp, DiscoveryNode node, String event_source) {
super(timestamp, event_source);
this.node = node;
}
@Override
protected String event() {
return "elected_as_master";
}
@Override
String conciseDescription() {
return node.toString() + " became master";
}
// no need to render node as XContent as it will be done by the exporter.
}
public static class NodeJoinLeave extends ClusterEvent {
private final DiscoveryNode node;
private boolean joined;
public NodeJoinLeave(long timestamp, DiscoveryNode node, boolean joined, String event_source) {
super(timestamp, event_source);
this.node = node;
this.joined = joined;
}
@Override
protected String event() {
return (joined ? "node_joined" : "node_left");
}
@Override
String conciseDescription() {
return node.toString() + (joined ? " joined" : " left");
}
@Override
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.addXContentBody(builder, params);
builder.startObject("node");
Utils.NodeToXContent(node, builder);
builder.endObject();
return builder;
}
}
public static class ClusterBlock extends ClusterEvent {
private final org.elasticsearch.cluster.block.ClusterBlock block;
private boolean added;

View file

@ -19,25 +19,23 @@ package org.elasticsearch.marvel.monitor.event;
*/
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.marvel.monitor.Utils;
import java.io.IOException;
public abstract class IndexEvent extends Event {
public abstract class IndexMetaDataEvent extends Event {
protected final String event_source;
public IndexEvent(long timestamp, String event_source) {
public IndexMetaDataEvent(long timestamp, String event_source) {
super(timestamp);
this.event_source = event_source;
}
@Override
public String type() {
return "index_event";
return "index_metadata_event";
}
protected abstract String event();
@ -50,7 +48,7 @@ public abstract class IndexEvent extends Event {
return builder;
}
public static class IndexCreateDelete extends IndexEvent {
public static class IndexCreateDelete extends IndexMetaDataEvent {
private final String index;
private boolean created;

View file

@ -0,0 +1,107 @@
package org.elasticsearch.marvel.monitor.event;
/*
* Licensed to ElasticSearch under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.marvel.monitor.Utils;
import java.io.IOException;
public abstract class NodeEvent extends Event {
protected final String event_source;
public NodeEvent(long timestamp, String event_source) {
super(timestamp);
this.event_source = event_source;
}
@Override
public String type() {
return "node_event";
}
protected abstract String event();
@Override
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.addXContentBody(builder, params);
builder.field("event", event());
builder.field("event_source", event_source);
return builder;
}
public static class ElectedAsMaster extends NodeEvent {
private final DiscoveryNode node;
public ElectedAsMaster(long timestamp, DiscoveryNode node, String event_source) {
super(timestamp, event_source);
this.node = node;
}
@Override
protected String event() {
return "elected_as_master";
}
@Override
String conciseDescription() {
return node.toString() + " became master";
}
// no need to render node as XContent as it will be done by the exporter.
}
public static class NodeJoinLeave extends NodeEvent {
private final DiscoveryNode node;
private boolean joined;
public NodeJoinLeave(long timestamp, DiscoveryNode node, boolean joined, String event_source) {
super(timestamp, event_source);
this.node = node;
this.joined = joined;
}
@Override
protected String event() {
return (joined ? "node_joined" : "node_left");
}
@Override
String conciseDescription() {
return node.toString() + (joined ? " joined" : " left");
}
@Override
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.addXContentBody(builder, params);
builder.startObject("node");
Utils.NodeToXContent(node, builder);
builder.endObject();
return builder;
}
}
}

View file

@ -0,0 +1,123 @@
package org.elasticsearch.marvel.monitor.event;
/*
* Licensed to ElasticSearch under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.marvel.monitor.Utils;
import java.io.IOException;
public abstract class RoutingEvent extends Event {
public RoutingEvent(long timestamp) {
super(timestamp);
}
@Override
public String type() {
return "routing_event";
}
public abstract String event();
@Override
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.addXContentBody(builder, params);
builder.field("event", event());
return builder;
}
static abstract class RoutingShardEvent extends RoutingEvent {
protected final ShardRouting shardRouting;
protected final DiscoveryNode node;
public RoutingShardEvent(long timestamp, ShardRouting shardRouting, DiscoveryNode node) {
super(timestamp);
this.node = node;
this.shardRouting = shardRouting;
}
@Override
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.addXContentBody(builder, params);
builder.field("index", shardRouting.index());
builder.field("shard_id", shardRouting.id());
builder.startObject("node");
Utils.NodeToXContent(node, builder);
builder.endObject();
builder.field("routing"); // shard routing opens it's own object.
shardRouting.toXContent(builder, params);
return builder;
}
}
public static class ShardInitializing extends RoutingShardEvent {
public ShardInitializing(long timestamp, ShardRouting shardRouting, DiscoveryNode node) {
super(timestamp, shardRouting, node);
}
@Override
public String event() {
return "shard_initializing";
}
@Override
String conciseDescription() {
return shardRouting.shardId() + "[" + (shardRouting.primary() ? "P" : "R") + "] set to initializing on " + node;
}
}
public static class ShardRelocating extends RoutingShardEvent {
final DiscoveryNode relocatingTo;
public ShardRelocating(long timestamp, ShardRouting shardRouting, DiscoveryNode node, DiscoveryNode relocatingTo) {
super(timestamp, shardRouting, node);
this.relocatingTo = relocatingTo;
}
@Override
public String event() {
return "shard_relocating";
}
@Override
String conciseDescription() {
return shardRouting.shardId() + "[" + (shardRouting.primary() ? "P" : "R") + "] set to relocate to "
+ relocatingTo + " from " + node;
}
@Override
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.addXContentBody(builder, params);
builder.startObject("relocated_to");
Utils.NodeToXContent(relocatingTo, builder);
builder.endObject();
return builder;
}
}
}