Added uploading of kibana dashboards, marvel index template is now based on marvel_index_template.json .

This commit is contained in:
Boaz Leskes 2013-12-23 19:42:28 +01:00
parent c155484c2a
commit b00566113d
3 changed files with 199 additions and 56 deletions

View file

@ -48,6 +48,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.IndexShard;
@ -90,7 +91,8 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
@Inject
public ExportersService(Settings settings, IndicesService indicesService,
NodeService nodeService, ClusterService clusterService,
Client client, Discovery discovery, ClusterName clusterName) {
Client client, Discovery discovery, ClusterName clusterName,
Environment environment, Plugin marvelPlugin) {
super(settings);
this.indicesService = (InternalIndicesService) indicesService;
this.clusterService = clusterService;
@ -105,7 +107,7 @@ public class ExportersService extends AbstractLifecycleComponent<ExportersServic
pendingEventsQueue = ConcurrentCollections.newBlockingQueue();
if (componentSettings.getAsBoolean("enabled", true)) {
StatsExporter esExporter = new ESExporter(settings.getComponentSettings(ESExporter.class), discovery, clusterName);
StatsExporter esExporter = new ESExporter(settings.getComponentSettings(ESExporter.class), discovery, clusterName, environment, marvelPlugin);
this.exporters = ImmutableSet.of(esExporter);
} else {
this.exporters = ImmutableSet.of();

View file

@ -30,25 +30,26 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.smile.SmileXContent;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.marvel.monitor.Plugin;
import org.elasticsearch.marvel.monitor.Utils;
import org.elasticsearch.marvel.monitor.event.Event;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Map;
public class ESExporter extends AbstractLifecycleComponent<ESExporter> implements StatsExporter<ESExporter> {
@ -58,12 +59,16 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
final DateTimeFormatter indexTimeFormatter;
final int timeout;
// index to upload dashboards into.
final String kibanaIndex;
final String[] dashboardPathsToUpload;
final Discovery discovery;
final ClusterName clusterName;
public final static DateTimeFormatter defaultDatePrinter = Joda.forPattern("date_time").printer();
boolean checkedForIndexTemplate = false;
boolean checkedAndUploadedAllResources = false;
final NodeStatsRenderer nodeStatsRenderer;
final ShardStatsRenderer shardStatsRenderer;
@ -72,7 +77,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
final ClusterStatsRenderer clusterStatsRenderer;
final EventsRenderer eventsRenderer;
public ESExporter(Settings settings, Discovery discovery, ClusterName clusterName) {
public ESExporter(Settings settings, Discovery discovery, ClusterName clusterName, Environment environment, Plugin marvelPlugin) {
super(settings);
this.discovery = discovery;
@ -85,6 +90,25 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
timeout = (int) settings.getAsTime("es.timeout", new TimeValue(6000)).seconds();
kibanaIndex = settings.get("es.kibana_index", ".marvel-kibana");
String dashboardsBasePath = settings.get("es.upload.dashboards.path");
File dashboardsBase;
if (dashboardsBasePath != null) {
dashboardsBase = new File(dashboardsBasePath);
} else {
dashboardsBase = new File(new File(environment.pluginsFile(), marvelPlugin.name()),
"_site/app/dashboards/marvel".replace('/', File.separatorChar)
);
}
ArrayList<String> dashboardPaths = new ArrayList<String>();
for (String d : settings.getAsArray("es.upload.dashboards", new String[]{"overview.json", "cluster_pulse.json"})) {
dashboardPaths.add(new File(dashboardsBase, d).getAbsolutePath());
}
dashboardPathsToUpload = dashboardPaths.toArray(new String[dashboardPaths.size()]);
nodeStatsRenderer = new NodeStatsRenderer();
shardStatsRenderer = new ShardStatsRenderer();
indexStatsRenderer = new IndexStatsRenderer();
@ -147,15 +171,17 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
private HttpURLConnection openExportingConnection() {
if (!checkedForIndexTemplate) {
if (!checkForIndexTemplate()) {
logger.debug("no template defined yet. skipping");
if (!checkedAndUploadedAllResources) {
try {
checkedAndUploadedAllResources = checkAndUploadAllResources();
} catch (ElasticSearchException e) {
logger.error("failed to upload critical resources, stopping export", e);
return null;
}
}
logger.trace("setting up an export connection");
HttpURLConnection conn = openConnection("POST", "/_bulk", XContentType.SMILE.restContentType());
HttpURLConnection conn = openConnection("POST", "_bulk", XContentType.SMILE.restContentType());
if (conn == null) {
logger.error("could not connect to any configured elasticsearch instances: [{}]", hosts);
}
@ -244,7 +270,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
for (; hostIndex < hosts.length; hostIndex++) {
String host = hosts[hostIndex];
try {
URL templateUrl = new URL("http://" + host + uri);
URL templateUrl = new URL("http://" + host + "/" + uri);
HttpURLConnection conn = (HttpURLConnection) templateUrl.openConnection();
conn.setRequestMethod(method);
conn.setConnectTimeout(timeout);
@ -276,58 +302,151 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
return null;
}
private boolean checkForIndexTemplate() {
/**
* Checks if resources such as index templates and dashboards already exist and if not uploads them/
* Any critical error that should prevent data exporting is communicated via an exception.
*
* @return true if all resources exist or are uploaded.
*/
private boolean checkAndUploadAllResources() {
boolean ret = checkAndUploadIndexTemplate();
for (String dashPath : dashboardPathsToUpload) {
ret = checkAndUploadDashboard(dashPath) && ret;
}
return ret;
}
private boolean checkAndUploadDashboard(String path) {
File dashboardFile = new File(path);
if (!dashboardFile.exists()) {
logger.warn("can't upload dashboard [{}] - file doesn't exist", path);
return true;
}
try {
String templateName = "marvel";
logger.debug("checking of target has template [{}]", templateName);
// DO HEAD REQUEST, when elasticsearch supports it
HttpURLConnection conn = openConnection("GET", "/_template/" + templateName);
if (conn == null) {
logger.error("Could not connect to any configured elasticsearch instances: [{}]", hosts);
return false;
byte[] dashboardBytes = Streams.copyToByteArray(dashboardFile);
XContentParser parser = XContentHelper.createParser(dashboardBytes, 0, dashboardBytes.length);
XContentParser.Token token = parser.nextToken();
if (token == null) {
throw new IOException("no data");
}
boolean hasTemplate = conn.getResponseCode() == 200;
// nothing there, lets create it
if (!hasTemplate) {
logger.debug("no template found in elasticsearch for [{}]. Adding...", templateName);
conn = openConnection("PUT", "/_template/" + templateName, XContentType.SMILE.restContentType());
OutputStream os = conn.getOutputStream();
XContentBuilder builder = XContentFactory.smileBuilder(os);
builder.startObject();
builder.field("template", ".marvel*");
builder.startObject("mappings").startObject("_default_");
builder.startArray("dynamic_templates").startObject().startObject("string_fields")
.field("match", "*")
.field("match_mapping_type", "string")
.startObject("mapping").field("index", "not_analyzed").endObject()
.endObject().endObject().endArray();
builder.endObject().endObject(); // mapping + root object.
builder.close();
os.close();
if (conn.getResponseCode() != 200) {
logConnectionError("error adding index template to elasticsearch", conn);
if (token != XContentParser.Token.START_OBJECT) {
throw new IOException("should start with an object");
}
String dashboardTitle = null;
String currentFieldName = null;
while (dashboardTitle == null && (token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
switch (token) {
case START_ARRAY:
parser.skipChildren();
break;
case START_OBJECT:
parser.skipChildren();
break;
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case VALUE_STRING:
if ("title".equals(currentFieldName)) {
dashboardTitle = parser.text();
}
break;
}
conn.getInputStream().close(); // close and release to connection pool.
}
checkedForIndexTemplate = true;
if (dashboardTitle == null) {
throw new IOException("failed to find dashboard title");
}
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
builder.field("title", dashboardTitle);
builder.field("dashboard", new String(dashboardBytes, "UTF-8"));
builder.endObject();
return checkAndUpload(kibanaIndex, "dashboard", dashboardTitle, builder.bytes().toBytes());
} catch (IOException e) {
logger.error("error when checking/adding template to elasticsearch", e);
logger.error("error while checking/uploading dashboard [{}]", path, e);
return false;
}
}
private String urlEncode(String s) {
try {
return URLEncoder.encode(s, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new ElasticSearchException("failed to url encode [" + s + "]", e);
}
}
/**
* checks whether a documents already exists in ES and if not uploads it
*
* @return true if the document exists or has been successfully uploaded.
*/
private boolean checkAndUpload(String index, String type, String id, byte[] bytes) throws IOException {
return checkAndUpload(urlEncode(index) + "/" + urlEncode(type) + "/" + urlEncode(id), bytes);
}
/**
* checks whether a documents already exists in ES and if not uploads it
*
* @return true if the document exists or has been successfully uploaded.
*/
private boolean checkAndUpload(String path, byte[] bytes) throws IOException {
logger.debug("checking if target has [{}]", path);
HttpURLConnection conn = openConnection("HEAD", path);
if (conn == null) {
logger.error("Could not connect to any configured elasticsearch instances: [{}]", hosts);
return false;
}
boolean hasDoc = conn.getResponseCode() == 200;
// nothing there, lets create it
if (!hasDoc) {
logger.debug("no document found in elasticsearch for [{}]. Adding...", path);
conn = openConnection("PUT", path);
OutputStream os = conn.getOutputStream();
Streams.copy(bytes, os);
if (!(conn.getResponseCode() == 200 || conn.getResponseCode() == 201)) {
logConnectionError("error adding document to elasticsearch", conn);
} else {
hasDoc = true;
}
conn.getInputStream().close(); // close and release to connection pool.
}
return hasDoc;
}
private boolean checkAndUploadIndexTemplate() {
byte[] template;
try {
template = Streams.copyToBytesFromClasspath("/marvel_index_template.json");
} catch (IOException e) {
// throwing an exception to stop exporting process - we don't want to send data unless
// we put in the template for it.
throw new ElasticSearchException("failed to load marvel_index_template.json", e);
}
try {
return checkAndUpload("_template/marvel", template);
} catch (IOException e) {
logger.error("error when checking/adding index template", e);
return false;
}
return true;
}
private void logConnectionError(String msg, HttpURLConnection conn) {
InputStream inputStream = conn.getErrorStream();
java.util.Scanner s = new java.util.Scanner(inputStream, "UTF-8").useDelimiter("\\A");
String err = s.hasNext() ? s.next() : "";
String err = "";
if (inputStream != null) {
java.util.Scanner s = new java.util.Scanner(inputStream, "UTF-8").useDelimiter("\\A");
err = s.hasNext() ? s.next() : "";
}
try {
logger.error("{} response code [{} {}]. content: {}", msg, conn.getResponseCode(), conn.getResponseMessage(), err);
logger.error("{} response code [{} {}]. content: [{}]", msg, conn.getResponseCode(), conn.getResponseMessage(), err);
} catch (IOException e) {
logger.error("connection had an error while reporting the error. tough life.");
}

View file

@ -0,0 +1,22 @@
{
"template": ".marvel*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"index": "not_analyzed"
}
}
}
]
}
}
}