mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
Merge remote-tracking branch 'upstream/master' into feature/issue-126-analytics
This commit is contained in:
commit
efb9fcabc4
12 changed files with 414 additions and 162 deletions
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.marvel.agent;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
import org.elasticsearch.marvel.agent.exporter.ESExporter;
|
||||
import org.elasticsearch.marvel.agent.exporter.Exporter;
|
||||
|
||||
public class AgentModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
Multibinder<Exporter> multibinder = Multibinder.newSetBinder(binder(), Exporter.class);
|
||||
multibinder.addBinding().to(ESExporter.class);
|
||||
bind(AgentService.class).asEagerSingleton();
|
||||
}
|
||||
}
|
|
@ -36,6 +36,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
|
||||
import org.elasticsearch.cluster.settings.DynamicSettings;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableSet;
|
||||
|
@ -45,7 +47,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
|
@ -53,18 +54,24 @@ import org.elasticsearch.indices.IndicesLifecycle;
|
|||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.InternalIndicesService;
|
||||
import org.elasticsearch.marvel.agent.event.*;
|
||||
import org.elasticsearch.marvel.agent.exporter.ESExporter;
|
||||
import org.elasticsearch.marvel.agent.exporter.Exporter;
|
||||
import org.elasticsearch.node.service.NodeService;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import static org.elasticsearch.common.collect.Lists.newArrayList;
|
||||
|
||||
public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
||||
public class AgentService extends AbstractLifecycleComponent<AgentService> implements NodeSettingsService.Listener {
|
||||
|
||||
public static final String SETTINGS_INTERVAL = "marvel.agent.interval";
|
||||
public static final String SETTINGS_INDICES = "marvel.agent.indices";
|
||||
public static final String SETTINGS_ENABLED = "marvel.agent.enabled";
|
||||
public static final String SETTINGS_SHARD_STATS_ENABLED = "marvel.agent.shard_stats.enabled";
|
||||
|
||||
private final InternalIndicesService indicesService;
|
||||
private final NodeService nodeService;
|
||||
|
@ -75,13 +82,14 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
|||
private final IndicesLifecycle.Listener indicesLifeCycleListener;
|
||||
private final ClusterStateListener clusterStateEventListener;
|
||||
|
||||
private volatile ExportingWorker exp;
|
||||
private volatile Thread thread;
|
||||
private final TimeValue interval;
|
||||
private volatile ExportingWorker exportingWorker;
|
||||
private volatile Thread workerThread;
|
||||
private volatile long samplingInterval;
|
||||
volatile private String[] indicesToExport = Strings.EMPTY_ARRAY;
|
||||
volatile private boolean exportShardStats;
|
||||
|
||||
private Collection<Exporter> exporters;
|
||||
|
||||
private String[] indicesToExport = Strings.EMPTY_ARRAY;
|
||||
|
||||
private final BlockingQueue<Event> pendingEventsQueue;
|
||||
|
||||
|
@ -89,13 +97,16 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
|||
public AgentService(Settings settings, IndicesService indicesService,
|
||||
NodeService nodeService, ClusterService clusterService,
|
||||
Client client, ClusterName clusterName,
|
||||
Environment environment, Plugin marvelPlugin) {
|
||||
NodeSettingsService nodeSettingsService,
|
||||
@ClusterDynamicSettings DynamicSettings dynamicSettings,
|
||||
Set<Exporter> exporters) {
|
||||
super(settings);
|
||||
this.indicesService = (InternalIndicesService) indicesService;
|
||||
this.clusterService = clusterService;
|
||||
this.nodeService = nodeService;
|
||||
this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(5));
|
||||
this.indicesToExport = componentSettings.getAsArray("indices", this.indicesToExport, true);
|
||||
this.samplingInterval = settings.getAsTime(SETTINGS_INTERVAL, TimeValue.timeValueSeconds(10)).millis();
|
||||
this.indicesToExport = settings.getAsArray(SETTINGS_INDICES, this.indicesToExport, true);
|
||||
this.exportShardStats = settings.getAsBoolean(SETTINGS_SHARD_STATS_ENABLED, false);
|
||||
this.client = client;
|
||||
this.clusterName = clusterName.value();
|
||||
|
||||
|
@ -103,13 +114,37 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
|||
clusterStateEventListener = new ClusterStateListener();
|
||||
pendingEventsQueue = ConcurrentCollections.newBlockingQueue();
|
||||
|
||||
if (componentSettings.getAsBoolean("enabled", true)) {
|
||||
Exporter esExporter = new ESExporter(settings.getComponentSettings(ESExporter.class), clusterService, clusterName, environment, marvelPlugin);
|
||||
this.exporters = ImmutableSet.of(esExporter);
|
||||
if (settings.getAsBoolean(SETTINGS_ENABLED, true)) {
|
||||
this.exporters = ImmutableSet.copyOf(exporters);
|
||||
} else {
|
||||
this.exporters = ImmutableSet.of();
|
||||
logger.info("collecting disabled by settings");
|
||||
}
|
||||
|
||||
nodeSettingsService.addListener(this);
|
||||
dynamicSettings.addDynamicSetting(SETTINGS_INTERVAL);
|
||||
dynamicSettings.addDynamicSetting(SETTINGS_INDICES + ".*"); // array settings
|
||||
}
|
||||
|
||||
protected void applyIntervalSettings() {
|
||||
if (samplingInterval <= 0) {
|
||||
logger.info("data sampling is disabled due to interval settings [{}]", samplingInterval);
|
||||
if (workerThread != null) {
|
||||
|
||||
// notify worker to stop on its leisure, not to disturb an exporting operation
|
||||
exportingWorker.closed = true;
|
||||
|
||||
exportingWorker = null;
|
||||
workerThread = null;
|
||||
}
|
||||
} else if (workerThread == null || !workerThread.isAlive()) {
|
||||
|
||||
exportingWorker = new ExportingWorker();
|
||||
workerThread = new Thread(exportingWorker, EsExecutors.threadName(settings, "marvel.exporters"));
|
||||
workerThread.setDaemon(true);
|
||||
workerThread.start();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -120,13 +155,10 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
|||
for (Exporter e : exporters)
|
||||
e.start();
|
||||
|
||||
this.exp = new ExportingWorker();
|
||||
this.thread = new Thread(exp, EsExecutors.threadName(settings, "marvel.exporters"));
|
||||
this.thread.setDaemon(true);
|
||||
this.thread.start();
|
||||
|
||||
indicesService.indicesLifecycle().addListener(indicesLifeCycleListener);
|
||||
clusterService.addLast(clusterStateEventListener);
|
||||
|
||||
applyIntervalSettings();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -134,19 +166,20 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
|||
if (exporters.size() == 0) {
|
||||
return;
|
||||
}
|
||||
this.exp.closed = true;
|
||||
this.thread.interrupt();
|
||||
try {
|
||||
this.thread.join(60000);
|
||||
} catch (InterruptedException e) {
|
||||
// we don't care...
|
||||
if (workerThread != null && workerThread.isAlive()) {
|
||||
exportingWorker.closed = true;
|
||||
workerThread.interrupt();
|
||||
try {
|
||||
workerThread.join(60000);
|
||||
} catch (InterruptedException e) {
|
||||
// we don't care...
|
||||
}
|
||||
}
|
||||
for (Exporter e : exporters)
|
||||
e.stop();
|
||||
|
||||
indicesService.indicesLifecycle().removeListener(indicesLifeCycleListener);
|
||||
clusterService.remove(clusterStateEventListener);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -155,23 +188,41 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
|||
e.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
TimeValue newSamplingInterval = settings.getAsTime(SETTINGS_INTERVAL, null);
|
||||
if (newSamplingInterval != null) {
|
||||
logger.info("sampling interval updated to [{}]", newSamplingInterval);
|
||||
samplingInterval = newSamplingInterval.millis();
|
||||
applyIntervalSettings();
|
||||
}
|
||||
|
||||
String[] indices = settings.getAsArray(SETTINGS_INDICES, null, true);
|
||||
if (indices != null) {
|
||||
logger.info("sampling indices updated to [{}]", Strings.arrayToCommaDelimitedString(indices));
|
||||
indicesToExport = indices;
|
||||
}
|
||||
}
|
||||
|
||||
class ExportingWorker implements Runnable {
|
||||
|
||||
volatile boolean closed;
|
||||
volatile boolean closed = false;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!closed) {
|
||||
// sleep first to allow node to complete initialization before collecting the first start
|
||||
try {
|
||||
Thread.sleep(interval.millis());
|
||||
Thread.sleep(samplingInterval);
|
||||
if (closed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// do the actual export..., go over the actual exporters list and...
|
||||
exportNodeStats();
|
||||
exportShardStats();
|
||||
if (exportShardStats) {
|
||||
exportShardStats();
|
||||
}
|
||||
exportEvents();
|
||||
|
||||
if (clusterService.state().nodes().localNodeMaster()) {
|
||||
|
@ -195,7 +246,7 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
|||
|
||||
private void exportIndicesStats() {
|
||||
logger.trace("local node is master, exporting indices stats");
|
||||
IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().get();
|
||||
IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats().all().setIndices(indicesToExport).get();
|
||||
for (Exporter e : exporters) {
|
||||
try {
|
||||
e.exportIndicesStats(indicesStatsResponse);
|
||||
|
@ -282,9 +333,15 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
|||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (samplingInterval <= 0) {
|
||||
// ignore as we're not sampling
|
||||
return;
|
||||
}
|
||||
|
||||
if (!event.localNodeMaster()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// only collect if i'm master.
|
||||
long timestamp = System.currentTimeMillis();
|
||||
|
||||
|
@ -456,7 +513,10 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
|
|||
|
||||
@Override
|
||||
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
|
||||
|
||||
if (samplingInterval <= 0) {
|
||||
// ignore as we're not sampling
|
||||
return;
|
||||
}
|
||||
DiscoveryNode relocatingNode = null;
|
||||
if (indexShard.routingEntry() != null) {
|
||||
if (indexShard.routingEntry().relocatingNodeId() != null) {
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.marvel.agent;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
|
@ -71,14 +70,7 @@ public class Plugin extends AbstractPlugin {
|
|||
if (!enabled) {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
Module m = new AbstractModule() {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(AgentService.class).asEagerSingleton();
|
||||
}
|
||||
};
|
||||
return ImmutableList.of(m);
|
||||
return ImmutableList.of((Module) new AgentModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,9 +28,13 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
|
||||
import org.elasticsearch.cluster.settings.DynamicSettings;
|
||||
import org.elasticsearch.common.Base64;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.joda.Joda;
|
||||
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
|
||||
|
@ -39,37 +43,40 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.common.xcontent.smile.SmileXContent;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.marvel.agent.Plugin;
|
||||
import org.elasticsearch.marvel.agent.Utils;
|
||||
import org.elasticsearch.marvel.agent.event.Event;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
|
||||
public class ESExporter extends AbstractLifecycleComponent<ESExporter> implements Exporter<ESExporter> {
|
||||
public class ESExporter extends AbstractLifecycleComponent<ESExporter> implements Exporter<ESExporter>, NodeSettingsService.Listener {
|
||||
|
||||
private static final String SETTINGS_PREFIX = "marvel.agent.exporter.es.";
|
||||
public static final String SETTINGS_HOSTS = SETTINGS_PREFIX + "hosts";
|
||||
public static final String SETTINGS_INDEX_PREFIX = SETTINGS_PREFIX + "index.prefix";
|
||||
public static final String SETTINGS_INDEX_TIME_FORMAT = SETTINGS_PREFIX + "index.timeformat";
|
||||
public static final String SETTINGS_TIMEOUT = SETTINGS_PREFIX + "timeout";
|
||||
|
||||
volatile String[] hosts;
|
||||
final String indexPrefix;
|
||||
final DateTimeFormatter indexTimeFormatter;
|
||||
final int timeout;
|
||||
|
||||
// index to upload dashboards into.
|
||||
final String kibanaIndex;
|
||||
final String[] dashboardPathsToUpload;
|
||||
volatile int timeout;
|
||||
|
||||
final ClusterService clusterService;
|
||||
final ClusterName clusterName;
|
||||
|
||||
public final static DateTimeFormatter defaultDatePrinter = Joda.forPattern("date_time").printer();
|
||||
|
||||
volatile boolean checkedAndUploadedAllResources = false;
|
||||
volatile boolean checkedAndUploadedIndexTemplate = false;
|
||||
|
||||
final NodeStatsRenderer nodeStatsRenderer;
|
||||
final ShardStatsRenderer shardStatsRenderer;
|
||||
|
@ -81,38 +88,21 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
|||
ConnectionKeepAliveWorker keepAliveWorker;
|
||||
Thread keepAliveThread;
|
||||
|
||||
public ESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName, Environment environment, Plugin marvelPlugin) {
|
||||
@Inject
|
||||
public ESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName,
|
||||
@ClusterDynamicSettings DynamicSettings dynamicSettings, NodeSettingsService nodeSettingsService) {
|
||||
super(settings);
|
||||
|
||||
this.clusterService = clusterService;
|
||||
|
||||
this.clusterName = clusterName;
|
||||
|
||||
hosts = settings.getAsArray("es.hosts", new String[]{"localhost:9200"});
|
||||
indexPrefix = settings.get("es.index.prefix", ".marvel");
|
||||
String indexTimeFormat = settings.get("es.index.timeformat", "YYYY.MM.dd");
|
||||
hosts = settings.getAsArray(SETTINGS_HOSTS, new String[]{"localhost:9200"});
|
||||
indexPrefix = settings.get(SETTINGS_INDEX_PREFIX, ".marvel");
|
||||
String indexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, "YYYY.MM.dd");
|
||||
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();
|
||||
|
||||
timeout = (int) settings.getAsTime("es.timeout", new TimeValue(6000)).seconds();
|
||||
|
||||
kibanaIndex = settings.get("es.kibana_index", ".marvel-kibana");
|
||||
|
||||
|
||||
String dashboardsBasePath = settings.get("es.upload.dashboards.path");
|
||||
File dashboardsBase;
|
||||
if (dashboardsBasePath != null) {
|
||||
dashboardsBase = new File(dashboardsBasePath);
|
||||
} else {
|
||||
dashboardsBase = new File(new File(environment.pluginsFile(), marvelPlugin.name()),
|
||||
"_site/app/dashboards/marvel".replace('/', File.separatorChar)
|
||||
);
|
||||
}
|
||||
ArrayList<String> dashboardPaths = new ArrayList<String>();
|
||||
for (String d : settings.getAsArray("es.upload.dashboards", new String[]{})) {
|
||||
dashboardPaths.add(new File(dashboardsBase, d).getAbsolutePath());
|
||||
}
|
||||
|
||||
dashboardPathsToUpload = dashboardPaths.toArray(new String[dashboardPaths.size()]);
|
||||
timeout = (int) settings.getAsTime(SETTINGS_TIMEOUT, new TimeValue(6000)).seconds();
|
||||
|
||||
nodeStatsRenderer = new NodeStatsRenderer();
|
||||
shardStatsRenderer = new ShardStatsRenderer();
|
||||
|
@ -123,6 +113,10 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
|||
|
||||
keepAliveWorker = new ConnectionKeepAliveWorker();
|
||||
|
||||
dynamicSettings.addDynamicSetting(SETTINGS_HOSTS + ".*");
|
||||
dynamicSettings.addDynamicSetting(SETTINGS_TIMEOUT);
|
||||
nodeSettingsService.addListener(this);
|
||||
|
||||
logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat);
|
||||
}
|
||||
|
||||
|
@ -177,11 +171,11 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
|||
|
||||
|
||||
private HttpURLConnection openExportingConnection() {
|
||||
if (!checkedAndUploadedAllResources) {
|
||||
if (!checkedAndUploadedIndexTemplate) {
|
||||
try {
|
||||
checkedAndUploadedAllResources = checkAndUploadAllResources();
|
||||
checkedAndUploadedIndexTemplate = checkAndUploadIndexTemplate();
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("failed to upload critical resources, stopping export", e);
|
||||
logger.error("failed to upload index template, stopping export", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -331,10 +325,10 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
|||
}
|
||||
} finally {
|
||||
if (hostIndex > 0 && hostIndex < hosts.length) {
|
||||
logger.debug("moving [{}] failed hosts to the end of the list", hostIndex + 1);
|
||||
logger.debug("moving [{}] failed hosts to the end of the list", hostIndex);
|
||||
String[] newHosts = new String[hosts.length];
|
||||
System.arraycopy(hosts, hostIndex, newHosts, 0, hosts.length - hostIndex);
|
||||
System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex - 1, hostIndex + 1);
|
||||
System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex, hostIndex);
|
||||
hosts = newHosts;
|
||||
logger.debug("preferred target host is now [{}]", hosts[0]);
|
||||
}
|
||||
|
@ -343,74 +337,6 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if resources such as index templates and dashboards already exist and if not uploads them/
|
||||
* Any critical error that should prevent data exporting is communicated via an exception.
|
||||
*
|
||||
* @return true if all resources exist or are uploaded.
|
||||
*/
|
||||
private boolean checkAndUploadAllResources() {
|
||||
boolean ret = checkAndUploadIndexTemplate();
|
||||
for (String dashPath : dashboardPathsToUpload) {
|
||||
ret = checkAndUploadDashboard(dashPath) && ret;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private boolean checkAndUploadDashboard(String path) {
|
||||
logger.debug("checking/uploading [{}]", path);
|
||||
File dashboardFile = new File(path);
|
||||
if (!dashboardFile.exists()) {
|
||||
logger.warn("can't upload dashboard [{}] - file doesn't exist", path);
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
byte[] dashboardBytes = Streams.copyToByteArray(dashboardFile);
|
||||
XContentParser parser = XContentHelper.createParser(dashboardBytes, 0, dashboardBytes.length);
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token == null) {
|
||||
throw new IOException("no data");
|
||||
}
|
||||
if (token != XContentParser.Token.START_OBJECT) {
|
||||
throw new IOException("should start with an object");
|
||||
}
|
||||
String dashboardTitle = null;
|
||||
String currentFieldName = null;
|
||||
while (dashboardTitle == null && (token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
switch (token) {
|
||||
case START_ARRAY:
|
||||
parser.skipChildren();
|
||||
break;
|
||||
case START_OBJECT:
|
||||
parser.skipChildren();
|
||||
break;
|
||||
case FIELD_NAME:
|
||||
currentFieldName = parser.currentName();
|
||||
break;
|
||||
case VALUE_STRING:
|
||||
if ("title".equals(currentFieldName)) {
|
||||
dashboardTitle = parser.text();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (dashboardTitle == null) {
|
||||
throw new IOException("failed to find dashboard title");
|
||||
}
|
||||
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
builder.startObject();
|
||||
builder.field("title", dashboardTitle);
|
||||
builder.field("dashboard", new String(dashboardBytes, "UTF-8"));
|
||||
builder.endObject();
|
||||
|
||||
return checkAndUpload(kibanaIndex, "dashboard", dashboardTitle, builder.bytes().toBytes());
|
||||
} catch (IOException e) {
|
||||
logger.error("error while checking/uploading dashboard [{}]", path, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private String urlEncode(String s) {
|
||||
try {
|
||||
return URLEncoder.encode(s, "UTF-8");
|
||||
|
@ -463,6 +389,12 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
|||
return hasDoc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the index templates already exist and if not uploads it
|
||||
* Any critical error that should prevent data exporting is communicated via an exception.
|
||||
*
|
||||
* @return true if template exists or was uploaded.
|
||||
*/
|
||||
private boolean checkAndUploadIndexTemplate() {
|
||||
byte[] template;
|
||||
try {
|
||||
|
@ -495,6 +427,21 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
TimeValue newTimeout = settings.getAsTime(SETTINGS_TIMEOUT, null);
|
||||
if (newTimeout != null) {
|
||||
logger.info("connection timeout set to [{}]", newTimeout);
|
||||
timeout = (int) newTimeout.seconds();
|
||||
}
|
||||
|
||||
String[] newHosts = settings.getAsArray(SETTINGS_HOSTS, null);
|
||||
if (newHosts != null) {
|
||||
logger.info("hosts set to [{}]", Strings.arrayToCommaDelimitedString(newHosts));
|
||||
this.hosts = newHosts;
|
||||
}
|
||||
}
|
||||
|
||||
interface MultiXContentRenderer {
|
||||
|
||||
int length();
|
||||
|
|
|
@ -335,6 +335,8 @@ define([
|
|||
return;
|
||||
}
|
||||
|
||||
$scope.panel.error = false;
|
||||
|
||||
var
|
||||
request,
|
||||
filter,
|
||||
|
@ -385,6 +387,11 @@ define([
|
|||
// populate the summary data based on the other facets
|
||||
newData = {};
|
||||
|
||||
// Check for error and abort if found
|
||||
if(!(_.isUndefined(r.error))) {
|
||||
$scope.panel.error = $scope.parse_error(r.error);
|
||||
return;
|
||||
}
|
||||
|
||||
_.each(r.facets['timestamp'].terms, function (f) {
|
||||
if (!$scope.panel.show_hidden && f.term[0] === ".") {
|
||||
|
|
|
@ -202,7 +202,8 @@ define([
|
|||
endpoint: null,
|
||||
urlPath: null,
|
||||
method: null,
|
||||
activeScheme: null
|
||||
activeScheme: null,
|
||||
editor: editor
|
||||
};
|
||||
|
||||
// context.updatedForToken = session.getTokenAt(pos.row, pos.column);
|
||||
|
@ -871,7 +872,7 @@ define([
|
|||
function getCompletions(aceEditor, session, pos, prefix, callback) {
|
||||
|
||||
|
||||
var context = getAutoCompleteContext(aceEditor, session, pos);
|
||||
var context = getAutoCompleteContext(editor, session, pos);
|
||||
if (!context) {
|
||||
callback(null, []);
|
||||
}
|
||||
|
|
|
@ -21,9 +21,8 @@ define([
|
|||
});
|
||||
}
|
||||
|
||||
function getRulesForPath(rules, tokenPath, scopeRules) {
|
||||
// scopeRules are the rules used to resolve relative scope links
|
||||
var walker = new json_rule_walker.RuleWalker(rules, scopeRules);
|
||||
function getRulesForPath(rules, tokenPath, context) {
|
||||
var walker = new json_rule_walker.RuleWalker(rules, context);
|
||||
walker.walkTokenPath(tokenPath);
|
||||
return walker.getNormalizedRules();
|
||||
|
||||
|
@ -47,7 +46,7 @@ define([
|
|||
function addAutocompleteForPath(autocompleteSet, rules, tokenPath, context) {
|
||||
// extracts the relevant parts of rules for tokenPath
|
||||
var initialRules = rules;
|
||||
rules = getRulesForPath(rules, tokenPath);
|
||||
rules = getRulesForPath(rules, tokenPath, context);
|
||||
|
||||
// apply rule set
|
||||
var term;
|
||||
|
@ -85,7 +84,7 @@ define([
|
|||
while (typeof rules_for_term.__template == "undefined" &&
|
||||
typeof rules_for_term.__scope_link != "undefined"
|
||||
) {
|
||||
rules_for_term = json_rule_walker.getLinkedRules(rules_for_term.__scope_link, initialRules);
|
||||
rules_for_term = json_rule_walker.getLinkedRules(rules_for_term.__scope_link, initialRules, context);
|
||||
}
|
||||
|
||||
if (typeof rules_for_term.__template != "undefined") {
|
||||
|
|
|
@ -3,14 +3,20 @@ define(['_', 'kb', 'exports'], function (_, kb, exports) {
|
|||
|
||||
var WALKER_MODE_EXPECTS_KEY = 1, WALKER_MODE_EXPECTS_CONTAINER = 2, WALKER_MODE_DONE = 3;
|
||||
|
||||
function RuleWalker(initialRules, scopeRules) {
|
||||
function RuleWalker(initialRules, context, scopeRules) {
|
||||
// scopeRules are the rules used to resolve relative scope links
|
||||
if (typeof scopeRules == "undefined") {
|
||||
scopeRules = initialRules;
|
||||
}
|
||||
|
||||
if ((initialRules || {}).__scope_link) {
|
||||
// normalize now
|
||||
initialRules = getLinkedRules(initialRules.__scope_link, scopeRules, context);
|
||||
}
|
||||
|
||||
this._rules = initialRules;
|
||||
this._mode = WALKER_MODE_EXPECTS_CONTAINER;
|
||||
this._context = context;
|
||||
this.scopeRules = scopeRules;
|
||||
}
|
||||
|
||||
|
@ -30,7 +36,11 @@ define(['_', 'kb', 'exports'], function (_, kb, exports) {
|
|||
return "value";
|
||||
}
|
||||
|
||||
function getLinkedRules(link, currentRules) {
|
||||
function getLinkedRules(link, currentRules, context) {
|
||||
if (_.isFunction(link)) {
|
||||
return link(context, currentRules)
|
||||
}
|
||||
|
||||
var link_path = link.split(".");
|
||||
var scheme_id = link_path.shift();
|
||||
var linked_rules = currentRules;
|
||||
|
@ -76,7 +86,7 @@ define(['_', 'kb', 'exports'], function (_, kb, exports) {
|
|||
new_rules = this._rules[token] || this._rules["*"]
|
||||
|| this._rules["$FIELD$"] || this._rules["$TYPE$"]; // we accept anything for a field.
|
||||
if (new_rules && new_rules.__scope_link) {
|
||||
new_rules = getLinkedRules(new_rules.__scope_link, this.scopeRules);
|
||||
new_rules = getLinkedRules(new_rules.__scope_link, this.scopeRules, this._context);
|
||||
}
|
||||
|
||||
switch (getRulesType(new_rules)) {
|
||||
|
|
|
@ -191,7 +191,7 @@ define([
|
|||
es.addServerChangeListener(function () {
|
||||
var version = es.getVersion(), api;
|
||||
if (!version || version.length == 0) {
|
||||
api = "kb/api_0_90";
|
||||
api = "kb/api_1_0";
|
||||
}
|
||||
else if (version[0] === "1") {
|
||||
api = "kb/api_1_0";
|
||||
|
|
|
@ -14,6 +14,7 @@ define([
|
|||
'./api_1_0/mappings',
|
||||
'./api_1_0/misc',
|
||||
'./api_1_0/query',
|
||||
'./api_1_0/snapshot_restore',
|
||||
'./api_1_0/search',
|
||||
'./api_1_0/settings',
|
||||
'./api_1_0/templates',
|
||||
|
|
154
sense/app/kb/api_1_0/snapshot_restore.js
Normal file
154
sense/app/kb/api_1_0/snapshot_restore.js
Normal file
|
@ -0,0 +1,154 @@
|
|||
define(function () {
|
||||
'use strict';
|
||||
|
||||
return function init(api) {
|
||||
api.addEndpointDescription('restore_snapshot', {
|
||||
methods: ['POST'],
|
||||
patterns: [
|
||||
'_snapshot/{id}/{id}/_restore'
|
||||
],
|
||||
url_params: {
|
||||
wait_for_completion: "__flag__"
|
||||
},
|
||||
data_autocomplete_rules: {
|
||||
indices: "*",
|
||||
ignore_unavailable: { __one_of: [ true, false] },
|
||||
include_global_state: false,
|
||||
rename_pattern: "index_(.+)",
|
||||
rename_replacement: "restored_index_$1"
|
||||
}
|
||||
});
|
||||
|
||||
api.addEndpointDescription('single_snapshot', {
|
||||
methods: ['GET', 'DELETE'],
|
||||
patterns: [
|
||||
'_snapshot/{id}/{id}'
|
||||
]
|
||||
});
|
||||
|
||||
api.addEndpointDescription('all_snapshots', {
|
||||
methods: ['GET'],
|
||||
patterns: [
|
||||
'_snapshot/{id}/_all'
|
||||
]
|
||||
});
|
||||
|
||||
api.addEndpointDescription('put_snapshot', {
|
||||
methods: ['PUT'],
|
||||
patterns: [
|
||||
'_snapshot/{id}/{id}'
|
||||
],
|
||||
url_params: {
|
||||
wait_for_completion: "__flag__"
|
||||
},
|
||||
data_autocomplete_rules: {
|
||||
indices: "*",
|
||||
ignore_unavailable: { __one_of: [ true, false] },
|
||||
include_global_state: { __one_of: [ true, false] },
|
||||
partial: { __one_of: [ true, false] }
|
||||
}
|
||||
});
|
||||
|
||||
function getRepositoryType(context) {
|
||||
var iter = context.editor.iterForCurrentLoc();
|
||||
// for now just iterate back to the first "type" key
|
||||
var t = iter.getCurrentToken();
|
||||
var type;
|
||||
while (t && t.type.indexOf("url") < 0) {
|
||||
if (t.type === 'variable' && t.value === '"type"') {
|
||||
t = context.editor.parser.nextNonEmptyToken(iter);
|
||||
if (!t || t.type !== "punctuation.colon") {
|
||||
// weird place to be in, but safe choice..
|
||||
break;
|
||||
}
|
||||
t = context.editor.parser.nextNonEmptyToken(iter);
|
||||
if (t && t.type === "string") {
|
||||
type = t.value.replace(/"/g, '');
|
||||
}
|
||||
break;
|
||||
}
|
||||
t = context.editor.parser.prevNonEmptyToken(iter);
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
api.addEndpointDescription('put_repository', {
|
||||
methods: ['PUT'],
|
||||
patterns: [
|
||||
'_snapshot/{id}'
|
||||
],
|
||||
data_autocomplete_rules: {
|
||||
__scope_link: function (context) {
|
||||
var type = getRepositoryType(context);
|
||||
if (!type) {
|
||||
return {
|
||||
"type": {
|
||||
__one_of: ["fs", "url", "s3", "hdfs"]
|
||||
}
|
||||
}
|
||||
}
|
||||
return {
|
||||
"settings": {
|
||||
__scope_link: function (context) {
|
||||
var rules = {
|
||||
fs: {
|
||||
__template: {
|
||||
location: "path"
|
||||
},
|
||||
location: "path",
|
||||
compress: { __one_of: [ true, false]},
|
||||
concurrent_streams: 5,
|
||||
chunk_size: "10m",
|
||||
max_restore_bytes_per_sec: "20mb",
|
||||
max_snapshot_bytes_per_sec: "20mb"
|
||||
},
|
||||
url: {
|
||||
__template: {
|
||||
url: ""
|
||||
},
|
||||
url: "",
|
||||
concurrent_streams: 5
|
||||
},
|
||||
s3: {
|
||||
__template: {
|
||||
bucket: ""
|
||||
},
|
||||
bucket: "",
|
||||
region: "",
|
||||
base_path: "",
|
||||
concurrent_streams: 5,
|
||||
chunk_size: "10m",
|
||||
compress: { __one_of: [ true, false]}
|
||||
},
|
||||
hdfs: {
|
||||
__template: {
|
||||
path: ""
|
||||
},
|
||||
uri: "",
|
||||
path: "some/path",
|
||||
load_defaults: { __one_of: [ true, false]},
|
||||
conf_location: "cfg.xml",
|
||||
concurrent_streams: 5,
|
||||
compress: { __one_of: [ true, false]},
|
||||
chunk_size: "10m"
|
||||
}
|
||||
};
|
||||
|
||||
var type = getRepositoryType(context);
|
||||
|
||||
if (!type) {
|
||||
console.log("failed to resolve snapshot, defaulting to 'fs'");
|
||||
type = "fs";
|
||||
}
|
||||
|
||||
return rules[type];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
;
|
||||
})
|
||||
;
|
|
@ -374,7 +374,7 @@ define([
|
|||
cursor: { row: 5, column: 21},
|
||||
initialValue: "",
|
||||
autoCompleteSet: [ 4, 5]
|
||||
},
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
|
@ -572,7 +572,8 @@ define([
|
|||
"e": {},
|
||||
"f": [
|
||||
{}
|
||||
]
|
||||
],
|
||||
"g": {}
|
||||
}
|
||||
},
|
||||
MAPPING,
|
||||
|
@ -604,7 +605,15 @@ define([
|
|||
{
|
||||
__scope_link: "ext.target"
|
||||
}
|
||||
]
|
||||
],
|
||||
"g": {
|
||||
__scope_link: function () {
|
||||
return {
|
||||
"a": 1,
|
||||
"b": 2
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -626,7 +635,7 @@ define([
|
|||
autoCompleteSet: [
|
||||
tt("b", {}), tt("c", {}), tt("d", {}), tt("e", {}), tt("f", [
|
||||
{}
|
||||
])
|
||||
]), tt("g", {})
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -648,6 +657,44 @@ define([
|
|||
name: "A scope link within an array",
|
||||
cursor: { row: 7, column: 10},
|
||||
autoCompleteSet: [tt("t2", 1)]
|
||||
},
|
||||
{
|
||||
name: "A function based scope link",
|
||||
cursor: { row: 9, column: 12},
|
||||
autoCompleteSet: [tt("a", 1), tt("b", 2)]
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
context_tests(
|
||||
{
|
||||
|
||||
},
|
||||
MAPPING,
|
||||
{
|
||||
globals: {
|
||||
gtarget: {
|
||||
t1: 2
|
||||
}
|
||||
},
|
||||
endpoints: {
|
||||
_current: {
|
||||
patterns: [ "_current" ],
|
||||
id: "POST _current",
|
||||
data_autocomplete_rules: {
|
||||
__scope_link: "GLOBAL.gtarget"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"POST _current",
|
||||
[
|
||||
{
|
||||
name: "Top level scope link",
|
||||
cursor: { row: 0, column: 1},
|
||||
autoCompleteSet: [
|
||||
tt("t1", 2)
|
||||
]
|
||||
}
|
||||
]
|
||||
);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue