Make basic settings dynamically updatable.

The following currently support dynamic changes:
marvel.agent.interval  (also supports setting to -1 to disable exporting)
marvel.agent.indices
marvel.agent.exporter.es.hosts
marvel.agent.exporter.es.timeout

Also removed dashboard uploading code in es exporter as it is not needed.

Closes #20
This commit is contained in:
Boaz Leskes 2014-03-06 14:44:05 +01:00 committed by Chris Cowan
parent 3bfbfb2842
commit 31495f604e
2 changed files with 135 additions and 137 deletions

View file

@ -36,6 +36,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableSet;
@ -54,6 +56,7 @@ import org.elasticsearch.indices.InternalIndicesService;
import org.elasticsearch.marvel.agent.event.*;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.ArrayList;
import java.util.Collection;
@ -63,12 +66,12 @@ import java.util.concurrent.BlockingQueue;
import static org.elasticsearch.common.collect.Lists.newArrayList;
public class AgentService extends AbstractLifecycleComponent<AgentService> {
public class AgentService extends AbstractLifecycleComponent<AgentService> implements NodeSettingsService.Listener {
public static final String SETTINGS_INTERVAL = "interval";
public static final String SETTINGS_INDICES = "indices";
public static final String SETTINGS_ENABLED = "enabled";
public static final String SETTINGS_SHARD_STATS_ENABLED = "shard_stats.enabled";
public static final String SETTINGS_INTERVAL = "marvel.agent.interval";
public static final String SETTINGS_INDICES = "marvel.agent.indices";
public static final String SETTINGS_ENABLED = "marvel.agent.enabled";
public static final String SETTINGS_SHARD_STATS_ENABLED = "marvel.agent.shard_stats.enabled";
private final InternalIndicesService indicesService;
private final NodeService nodeService;
@ -79,14 +82,14 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
private final IndicesLifecycle.Listener indicesLifeCycleListener;
private final ClusterStateListener clusterStateEventListener;
private volatile ExportingWorker exp;
private volatile Thread thread;
private final TimeValue interval;
private volatile ExportingWorker exportingWorker;
private volatile Thread workerThread;
private volatile long samplingInterval;
volatile private String[] indicesToExport = Strings.EMPTY_ARRAY;
volatile private boolean exportShardStats;
private Collection<Exporter> exporters;
volatile private String[] indicesToExport = Strings.EMPTY_ARRAY;
volatile private boolean exportShardStats;
private final BlockingQueue<Event> pendingEventsQueue;
@ -94,14 +97,16 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
public AgentService(Settings settings, IndicesService indicesService,
NodeService nodeService, ClusterService clusterService,
Client client, ClusterName clusterName,
NodeSettingsService nodeSettingsService,
@ClusterDynamicSettings DynamicSettings dynamicSettings,
Set<Exporter> exporters) {
super(settings);
this.indicesService = (InternalIndicesService) indicesService;
this.clusterService = clusterService;
this.nodeService = nodeService;
this.interval = componentSettings.getAsTime(SETTINGS_INTERVAL, TimeValue.timeValueSeconds(10));
this.indicesToExport = componentSettings.getAsArray(SETTINGS_INDICES, this.indicesToExport, true);
this.exportShardStats = componentSettings.getAsBoolean(SETTINGS_SHARD_STATS_ENABLED, false);
this.samplingInterval = settings.getAsTime(SETTINGS_INTERVAL, TimeValue.timeValueSeconds(10)).millis();
this.indicesToExport = settings.getAsArray(SETTINGS_INDICES, this.indicesToExport, true);
this.exportShardStats = settings.getAsBoolean(SETTINGS_SHARD_STATS_ENABLED, false);
this.client = client;
this.clusterName = clusterName.value();
@ -109,12 +114,37 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
clusterStateEventListener = new ClusterStateListener();
pendingEventsQueue = ConcurrentCollections.newBlockingQueue();
if (componentSettings.getAsBoolean(SETTINGS_ENABLED, true)) {
if (settings.getAsBoolean(SETTINGS_ENABLED, true)) {
this.exporters = ImmutableSet.copyOf(exporters);
} else {
this.exporters = ImmutableSet.of();
logger.info("collecting disabled by settings");
}
nodeSettingsService.addListener(this);
dynamicSettings.addDynamicSetting(SETTINGS_INTERVAL);
dynamicSettings.addDynamicSetting(SETTINGS_INDICES + ".*"); // array settings
}
protected void applyIntervalSettings() {
if (samplingInterval <= 0) {
logger.info("data sampling is disabled due to interval settings [{}]", samplingInterval);
if (workerThread != null) {
// notify worker to stop on its leisure, not to disturb an exporting operation
exportingWorker.closed = true;
exportingWorker = null;
workerThread = null;
}
} else if (workerThread == null || !workerThread.isAlive()) {
exportingWorker = new ExportingWorker();
workerThread = new Thread(exportingWorker, EsExecutors.threadName(settings, "marvel.exporters"));
workerThread.setDaemon(true);
workerThread.start();
}
}
@Override
@ -125,13 +155,10 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
for (Exporter e : exporters)
e.start();
this.exp = new ExportingWorker();
this.thread = new Thread(exp, EsExecutors.threadName(settings, "marvel.exporters"));
this.thread.setDaemon(true);
this.thread.start();
indicesService.indicesLifecycle().addListener(indicesLifeCycleListener);
clusterService.addLast(clusterStateEventListener);
applyIntervalSettings();
}
@Override
@ -139,19 +166,20 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
if (exporters.size() == 0) {
return;
}
this.exp.closed = true;
this.thread.interrupt();
try {
this.thread.join(60000);
} catch (InterruptedException e) {
// we don't care...
if (workerThread != null && workerThread.isAlive()) {
exportingWorker.closed = true;
workerThread.interrupt();
try {
workerThread.join(60000);
} catch (InterruptedException e) {
// we don't care...
}
}
for (Exporter e : exporters)
e.stop();
indicesService.indicesLifecycle().removeListener(indicesLifeCycleListener);
clusterService.remove(clusterStateEventListener);
}
@Override
@ -160,16 +188,32 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
e.close();
}
@Override
public void onRefreshSettings(Settings settings) {
TimeValue newSamplingInterval = settings.getAsTime(SETTINGS_INTERVAL, null);
if (newSamplingInterval != null) {
logger.info("sampling interval updated to [{}]", newSamplingInterval);
samplingInterval = newSamplingInterval.millis();
applyIntervalSettings();
}
String[] indices = settings.getAsArray(SETTINGS_INDICES, null, true);
if (indices != null) {
logger.info("sampling indices updated to [{}]", Strings.arrayToCommaDelimitedString(indices));
indicesToExport = indices;
}
}
class ExportingWorker implements Runnable {
volatile boolean closed;
volatile boolean closed = false;
@Override
public void run() {
while (!closed) {
// sleep first to allow node to complete initialization before collecting the first start
try {
Thread.sleep(interval.millis());
Thread.sleep(samplingInterval);
if (closed) {
continue;
}
@ -289,9 +333,15 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (samplingInterval <= 0) {
// ignore as we're not sampling
return;
}
if (!event.localNodeMaster()) {
return;
}
// only collect if i'm master.
long timestamp = System.currentTimeMillis();
@ -463,7 +513,10 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
if (samplingInterval <= 0) {
// ignore as we're not sampling
return;
}
DiscoveryNode relocatingNode = null;
if (indexShard.routingEntry() != null) {
if (indexShard.routingEntry().relocatingNodeId() != null) {

View file

@ -28,7 +28,10 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -40,37 +43,40 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.smile.SmileXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.marvel.agent.Plugin;
import org.elasticsearch.marvel.agent.Utils;
import org.elasticsearch.marvel.agent.event.Event;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.io.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Map;
public class ESExporter extends AbstractLifecycleComponent<ESExporter> implements Exporter<ESExporter> {
public class ESExporter extends AbstractLifecycleComponent<ESExporter> implements Exporter<ESExporter>, NodeSettingsService.Listener {
private static final String SETTINGS_PREFIX = "marvel.agent.exporter.es.";
public static final String SETTINGS_HOSTS = SETTINGS_PREFIX + "hosts";
public static final String SETTINGS_INDEX_PREFIX = SETTINGS_PREFIX + "index.prefix";
public static final String SETTINGS_INDEX_TIME_FORMAT = SETTINGS_PREFIX + "index.timeformat";
public static final String SETTINGS_TIMEOUT = SETTINGS_PREFIX + "timeout";
volatile String[] hosts;
final String indexPrefix;
final DateTimeFormatter indexTimeFormatter;
final int timeout;
// index to upload dashboards into.
final String kibanaIndex;
final String[] dashboardPathsToUpload;
volatile int timeout;
final ClusterService clusterService;
final ClusterName clusterName;
public final static DateTimeFormatter defaultDatePrinter = Joda.forPattern("date_time").printer();
volatile boolean checkedAndUploadedAllResources = false;
volatile boolean checkedAndUploadedIndexTemplate = false;
final NodeStatsRenderer nodeStatsRenderer;
final ShardStatsRenderer shardStatsRenderer;
@ -83,38 +89,20 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
Thread keepAliveThread;
@Inject
public ESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName, Environment environment, Plugin marvelPlugin) {
public ESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName,
@ClusterDynamicSettings DynamicSettings dynamicSettings, NodeSettingsService nodeSettingsService) {
super(settings);
this.clusterService = clusterService;
this.clusterName = clusterName;
hosts = componentSettings.getAsArray("es.hosts", new String[]{"localhost:9200"});
indexPrefix = componentSettings.get("es.index.prefix", ".marvel");
String indexTimeFormat = componentSettings.get("es.index.timeformat", "YYYY.MM.dd");
hosts = settings.getAsArray(SETTINGS_HOSTS, new String[]{"localhost:9200"});
indexPrefix = settings.get(SETTINGS_INDEX_PREFIX, ".marvel");
String indexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, "YYYY.MM.dd");
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();
timeout = (int) componentSettings.getAsTime("es.timeout", new TimeValue(6000)).seconds();
kibanaIndex = componentSettings.get("es.kibana_index", ".marvel-kibana");
String dashboardsBasePath = componentSettings.get("es.upload.dashboards.path");
File dashboardsBase;
if (dashboardsBasePath != null) {
dashboardsBase = new File(dashboardsBasePath);
} else {
dashboardsBase = new File(new File(environment.pluginsFile(), marvelPlugin.name()),
"_site/app/dashboards/marvel".replace('/', File.separatorChar)
);
}
ArrayList<String> dashboardPaths = new ArrayList<String>();
for (String d : componentSettings.getAsArray("es.upload.dashboards", new String[]{})) {
dashboardPaths.add(new File(dashboardsBase, d).getAbsolutePath());
}
dashboardPathsToUpload = dashboardPaths.toArray(new String[dashboardPaths.size()]);
timeout = (int) settings.getAsTime(SETTINGS_TIMEOUT, new TimeValue(6000)).seconds();
nodeStatsRenderer = new NodeStatsRenderer();
shardStatsRenderer = new ShardStatsRenderer();
@ -125,6 +113,10 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
keepAliveWorker = new ConnectionKeepAliveWorker();
dynamicSettings.addDynamicSetting(SETTINGS_HOSTS + ".*");
dynamicSettings.addDynamicSetting(SETTINGS_TIMEOUT);
nodeSettingsService.addListener(this);
logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat);
}
@ -179,11 +171,11 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
private HttpURLConnection openExportingConnection() {
if (!checkedAndUploadedAllResources) {
if (!checkedAndUploadedIndexTemplate) {
try {
checkedAndUploadedAllResources = checkAndUploadAllResources();
checkedAndUploadedIndexTemplate = checkAndUploadIndexTemplate();
} catch (RuntimeException e) {
logger.error("failed to upload critical resources, stopping export", e);
logger.error("failed to upload index template, stopping export", e);
return null;
}
}
@ -333,10 +325,10 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
}
} finally {
if (hostIndex > 0 && hostIndex < hosts.length) {
logger.debug("moving [{}] failed hosts to the end of the list", hostIndex + 1);
logger.debug("moving [{}] failed hosts to the end of the list", hostIndex);
String[] newHosts = new String[hosts.length];
System.arraycopy(hosts, hostIndex, newHosts, 0, hosts.length - hostIndex);
System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex - 1, hostIndex + 1);
System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex, hostIndex);
hosts = newHosts;
logger.debug("preferred target host is now [{}]", hosts[0]);
}
@ -345,74 +337,6 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
return null;
}
/**
* Checks if resources such as index templates and dashboards already exist and if not uploads them/
* Any critical error that should prevent data exporting is communicated via an exception.
*
* @return true if all resources exist or are uploaded.
*/
private boolean checkAndUploadAllResources() {
boolean ret = checkAndUploadIndexTemplate();
for (String dashPath : dashboardPathsToUpload) {
ret = checkAndUploadDashboard(dashPath) && ret;
}
return ret;
}
private boolean checkAndUploadDashboard(String path) {
logger.debug("checking/uploading [{}]", path);
File dashboardFile = new File(path);
if (!dashboardFile.exists()) {
logger.warn("can't upload dashboard [{}] - file doesn't exist", path);
return true;
}
try {
byte[] dashboardBytes = Streams.copyToByteArray(dashboardFile);
XContentParser parser = XContentHelper.createParser(dashboardBytes, 0, dashboardBytes.length);
XContentParser.Token token = parser.nextToken();
if (token == null) {
throw new IOException("no data");
}
if (token != XContentParser.Token.START_OBJECT) {
throw new IOException("should start with an object");
}
String dashboardTitle = null;
String currentFieldName = null;
while (dashboardTitle == null && (token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
switch (token) {
case START_ARRAY:
parser.skipChildren();
break;
case START_OBJECT:
parser.skipChildren();
break;
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case VALUE_STRING:
if ("title".equals(currentFieldName)) {
dashboardTitle = parser.text();
}
break;
}
}
if (dashboardTitle == null) {
throw new IOException("failed to find dashboard title");
}
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
builder.field("title", dashboardTitle);
builder.field("dashboard", new String(dashboardBytes, "UTF-8"));
builder.endObject();
return checkAndUpload(kibanaIndex, "dashboard", dashboardTitle, builder.bytes().toBytes());
} catch (IOException e) {
logger.error("error while checking/uploading dashboard [{}]", path, e);
return false;
}
}
private String urlEncode(String s) {
try {
return URLEncoder.encode(s, "UTF-8");
@ -465,6 +389,12 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
return hasDoc;
}
/**
* Checks if the index templates already exist and if not uploads it
* Any critical error that should prevent data exporting is communicated via an exception.
*
* @return true if template exists or was uploaded.
*/
private boolean checkAndUploadIndexTemplate() {
byte[] template;
try {
@ -497,6 +427,21 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
}
}
@Override
public void onRefreshSettings(Settings settings) {
TimeValue newTimeout = settings.getAsTime(SETTINGS_TIMEOUT, null);
if (newTimeout != null) {
logger.info("connection timeout set to [{}]", newTimeout);
timeout = (int) newTimeout.seconds();
}
String[] newHosts = settings.getAsArray(SETTINGS_HOSTS, null);
if (newHosts != null) {
logger.info("hosts set to [{}]", Strings.arrayToCommaDelimitedString(newHosts));
this.hosts = newHosts;
}
}
interface MultiXContentRenderer {
int length();