Lower cased nodeToXContent.

Added a protection for relocatingTo potentially be in.
Removed indexPrefix settings and fixed it to ".marvel" (will add a suffix later)
Renamed java plugin to "marvel".
This commit is contained in:
Boaz Leskes 2013-12-05 13:54:01 +01:00
parent 157ea75b7d
commit de507d43ea
7 changed files with 110 additions and 105 deletions

View file

@ -194,8 +194,6 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
} catch (Throwable t) {
logger.error("StatsExporter [{}] has thrown an exception:", t, e.name());
}
}
}
@ -264,97 +262,101 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) {
// only collect if i'm master.
long timestamp = System.currentTimeMillis();
if (!event.previousState().nodes().localNodeMaster()) {
pendingEventsQueue.add(new NodeEvent.ElectedAsMaster(timestamp, event.state().nodes().localNode(), event.source()));
}
if (!event.localNodeMaster()) {
return;
}
// only collect if i'm master.
long timestamp = System.currentTimeMillis();
if (!event.previousState().nodes().localNodeMaster()) {
pendingEventsQueue.add(new NodeEvent.ElectedAsMaster(timestamp, event.state().nodes().localNode(), event.source()));
}
for (DiscoveryNode node : event.nodesDelta().addedNodes()) {
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, node, true, event.source()));
}
for (DiscoveryNode node : event.nodesDelta().addedNodes()) {
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, node, true, event.source()));
}
for (DiscoveryNode node : event.nodesDelta().removedNodes()) {
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, node, false, event.source()));
}
for (DiscoveryNode node : event.nodesDelta().removedNodes()) {
pendingEventsQueue.add(new NodeEvent.NodeJoinLeave(timestamp, node, false, event.source()));
}
if (event.routingTableChanged()) {
// hunt for initializing shards
RoutingNodes previousRoutingNodes = event.previousState().routingNodes();
for (ShardRouting shardRouting : event.state().routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)) {
RoutingNode oldRoutingNode = previousRoutingNodes.node(shardRouting.currentNodeId());
boolean changed = true;
if (oldRoutingNode != null) {
for (ShardRouting oldShardRouting : oldRoutingNode.shards()) {
if (oldShardRouting.equals(shardRouting)) {
changed = false;
break;
}
if (event.routingTableChanged()) {
// hunt for initializing shards
RoutingNodes previousRoutingNodes = event.previousState().routingNodes();
for (ShardRouting shardRouting : event.state().routingNodes().shardsWithState(ShardRoutingState.INITIALIZING)) {
RoutingNode oldRoutingNode = previousRoutingNodes.node(shardRouting.currentNodeId());
boolean changed = true;
if (oldRoutingNode != null) {
for (ShardRouting oldShardRouting : oldRoutingNode.shards()) {
if (oldShardRouting.equals(shardRouting)) {
changed = false;
break;
}
}
if (!changed) {
continue; // no event.
}
if (!changed) {
continue; // no event.
}
if (shardRouting.relocatingNodeId() != null) {
// if relocating node is not null, this shard is initializing due to a relocation
ShardRouting tmpShardRouting = new MutableShardRouting(
shardRouting.index(), shardRouting.id(), shardRouting.relocatingNodeId(),
shardRouting.currentNodeId(), shardRouting.primary(),
ShardRoutingState.RELOCATING, shardRouting.version());
DiscoveryNode relocatingTo = null;
if (tmpShardRouting.relocatingNodeId() != null) {
relocatingTo = event.state().nodes().get(tmpShardRouting.relocatingNodeId());
}
if (shardRouting.relocatingNodeId() != null) {
// if relocating node is not null, this shard is initializing due to a relocation
ShardRouting tmpShardRouting = new MutableShardRouting(
shardRouting.index(), shardRouting.id(), shardRouting.relocatingNodeId(),
shardRouting.currentNodeId(), shardRouting.primary(),
ShardRoutingState.RELOCATING, shardRouting.version());
pendingEventsQueue.add(new RoutingEvent.ShardRelocating(timestamp, tmpShardRouting,
clusterService.state().nodes().get(tmpShardRouting.relocatingNodeId()),
clusterService.state().nodes().get(tmpShardRouting.currentNodeId())
));
} else {
pendingEventsQueue.add(new RoutingEvent.ShardInitializing(timestamp, shardRouting,
clusterService.state().nodes().get(shardRouting.currentNodeId())
));
}
pendingEventsQueue.add(new RoutingEvent.ShardRelocating(timestamp, tmpShardRouting,
relocatingTo, event.state().nodes().get(tmpShardRouting.currentNodeId())
));
} else {
pendingEventsQueue.add(new RoutingEvent.ShardInitializing(timestamp, shardRouting,
event.state().nodes().get(shardRouting.currentNodeId())
));
}
}
if (event.blocksChanged()) {
// TODO: Add index blocks
List<ClusterBlock> removed = newArrayList();
List<ClusterBlock> added = newArrayList();
ImmutableSet<ClusterBlock> currentBlocks = event.state().blocks().global();
ImmutableSet<ClusterBlock> previousBlocks = event.previousState().blocks().global();
for (ClusterBlock block : previousBlocks) {
if (!currentBlocks.contains(block)) {
removed.add(block);
}
}
for (ClusterBlock block : currentBlocks) {
if (!previousBlocks.contains(block)) {
added.add(block);
}
}
for (ClusterBlock block : added) {
pendingEventsQueue.add(new ClusterEvent.ClusterBlock(timestamp, block, true, event.source()));
}
for (ClusterBlock block : removed) {
pendingEventsQueue.add(new ClusterEvent.ClusterBlock(timestamp, block, false, event.source()));
}
}
for (String index : event.indicesCreated()) {
pendingEventsQueue.add(new IndexMetaDataEvent.IndexCreateDelete(timestamp, index, true, event.source()));
}
for (String index : event.indicesDeleted()) {
pendingEventsQueue.add(new IndexMetaDataEvent.IndexCreateDelete(timestamp, index, false, event.source()));
}
}
if (event.blocksChanged()) {
// TODO: Add index blocks
List<ClusterBlock> removed = newArrayList();
List<ClusterBlock> added = newArrayList();
ImmutableSet<ClusterBlock> currentBlocks = event.state().blocks().global();
ImmutableSet<ClusterBlock> previousBlocks = event.previousState().blocks().global();
for (ClusterBlock block : previousBlocks) {
if (!currentBlocks.contains(block)) {
removed.add(block);
}
}
for (ClusterBlock block : currentBlocks) {
if (!previousBlocks.contains(block)) {
added.add(block);
}
}
for (ClusterBlock block : added) {
pendingEventsQueue.add(new ClusterEvent.ClusterBlock(timestamp, block, true, event.source()));
}
for (ClusterBlock block : removed) {
pendingEventsQueue.add(new ClusterEvent.ClusterBlock(timestamp, block, false, event.source()));
}
}
for (String index : event.indicesCreated()) {
pendingEventsQueue.add(new IndexMetaDataEvent.IndexCreateDelete(timestamp, index, true, event.source()));
}
for (String index : event.indicesDeleted()) {
pendingEventsQueue.add(new IndexMetaDataEvent.IndexCreateDelete(timestamp, index, false, event.source()));
}
}
}
@ -380,11 +382,11 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
DiscoveryNode relocatedTo = null;
if (indexShard.routingEntry().relocating()) {
if (indexShard != null && indexShard.routingEntry().relocating()) {
relocatedTo = clusterService.state().nodes().get(indexShard.routingEntry().relocatingNodeId());
}
pendingEventsQueue.add(new ShardEvent(System.currentTimeMillis(), ShardEvent.EventType.CLOSED,
indexShard.shardId(), clusterService.localNode(), relocatedTo, indexShard.routingEntry()));
shardId, clusterService.localNode(), relocatedTo, indexShard != null ? indexShard.routingEntry() : null));
}
}

View file

@ -36,12 +36,12 @@ public class Plugin extends AbstractPlugin {
@Override
public String name() {
return "marvel - stats exporter";
return "marvel";
}
@Override
public String description() {
return "Monitoring with an elastic sauce";
return "Elasticsearch Management & Monitoring";
}
@Override

View file

@ -29,7 +29,7 @@ import java.util.Map;
public class Utils {
public static XContentBuilder NodeToXContent(DiscoveryNode node, XContentBuilder builder) throws IOException {
public static XContentBuilder nodeToXContent(DiscoveryNode node, XContentBuilder builder) throws IOException {
builder.field("id", node.id());
builder.field("name", node.name());
builder.field("transport_address", node.address());

View file

@ -98,7 +98,7 @@ public abstract class NodeEvent extends Event {
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.addXContentBody(builder, params);
builder.startObject("node");
Utils.NodeToXContent(node, builder);
Utils.nodeToXContent(node, builder);
builder.endObject();
return builder;
}

View file

@ -65,7 +65,7 @@ public abstract class RoutingEvent extends Event {
builder.field("index", shardRouting.index());
builder.field("shard_id", shardRouting.id());
builder.startObject("node");
Utils.NodeToXContent(node, builder);
Utils.nodeToXContent(node, builder);
builder.endObject();
builder.field("routing"); // shard routing opens it's own object.
shardRouting.toXContent(builder, params);
@ -106,16 +106,21 @@ public abstract class RoutingEvent extends Event {
@Override
String conciseDescription() {
return shardRouting.shardId() + "[" + (shardRouting.primary() ? "P" : "R") + "] set to relocate to "
+ relocatingTo + " from " + node;
String s = shardRouting.shardId() + "[" + (shardRouting.primary() ? "P" : "R") + "] set to relocate";
if (relocatingTo != null) {
s += " to " + relocatingTo;
}
return s + " from " + node;
}
@Override
public XContentBuilder addXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.addXContentBody(builder, params);
builder.startObject("relocated_to");
Utils.NodeToXContent(relocatingTo, builder);
builder.endObject();
if (relocatingTo != null) {
builder.startObject("relocated_to");
Utils.nodeToXContent(relocatingTo, builder);
builder.endObject();
}
return builder;
}
}

View file

@ -81,7 +81,6 @@ public class ShardEvent extends Event {
}
default:
throw new ElasticSearchException("unmapped event type [" + event + "]");
}
}
@ -92,7 +91,7 @@ public class ShardEvent extends Event {
builder.field("index", shardId.index());
builder.field("shard_id", shardId.id());
builder.startObject("node");
Utils.NodeToXContent(node, builder);
Utils.nodeToXContent(node, builder);
builder.endObject();
if (shardRouting != null) {
builder.field("routing");
@ -100,7 +99,7 @@ public class ShardEvent extends Event {
}
if (relocatingNode != null) {
builder.startObject(event == EventType.STARTED ? "relocated_from" : "relocated_to");
Utils.NodeToXContent(relocatingNode, builder);
Utils.nodeToXContent(relocatingNode, builder);
builder.endObject();
}
return builder;

View file

@ -25,7 +25,6 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -87,7 +86,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
indicesStatsRenderer = new IndicesStatsRenderer();
eventsRenderer = new EventsRenderer();
logger.info("Initialized with targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat);
logger.debug("Initialized with targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat);
}
@Override
@ -158,7 +157,9 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
for (int i = 0; i < renderer.length(); i++) {
XContentBuilder builder = XContentFactory.smileBuilder(os);
builder.startObject().startObject("index")
.field("_index", getIndexName()).field("_type", renderer.type(i)).endObject().endObject();
.field("_index", getIndexName())
.field("_type", renderer.type(i))
.endObject().endObject();
builder.flush();
os.write(SmileXContent.smileXContent.streamSeparator());
@ -252,9 +253,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
private boolean checkForIndexTemplate() {
try {
String templateName = "marvel.monitor.prefix-" + indexPrefix;
String templateName = "marvel";
logger.debug("checking of target has template [{}]", templateName);
// DO HEAD REQUEST, when elasticsearch supports it
@ -273,7 +272,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
OutputStream os = conn.getOutputStream();
XContentBuilder builder = XContentFactory.smileBuilder(os);
builder.startObject();
builder.field("template", indexPrefix + "*");
builder.field("template", ".marvel*");
builder.startObject("mappings").startObject("_default_");
builder.startArray("dynamic_templates").startObject().startObject("string_fields")
.field("match", "*")
@ -325,7 +324,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
private void addNodeInfo(XContentBuilder builder, String fieldname) throws IOException {
builder.startObject(fieldname);
Utils.NodeToXContent(discovery.localNode(), builder);
Utils.nodeToXContent(discovery.localNode(), builder);
builder.endObject();
}