[backport 7x] hash function of pipeline config with metadata (#12425)

add metadata in the hash function

Fixed #12387
This commit is contained in:
kaisecheng 2020-11-09 15:36:50 +01:00 committed by GitHub
parent 1ea4837b71
commit aba562e887
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 81 additions and 24 deletions

View file

@ -39,6 +39,7 @@ public class SourceWithMetadata implements HashableWithSource {
private final Integer column;
private final String text;
private int linesCount;
private final String metadata;
public String getProtocol() {
return this.protocol;
@ -60,14 +61,17 @@ public class SourceWithMetadata implements HashableWithSource {
return text;
}
public String getMetadata() { return metadata; }
private static final Pattern emptyString = Pattern.compile("^\\s*$");
public SourceWithMetadata(String protocol, String id, Integer line, Integer column, String text) throws IncompleteSourceWithMetadataException {
public SourceWithMetadata(String protocol, String id, Integer line, Integer column, String text, String metadata) throws IncompleteSourceWithMetadataException {
this.protocol = protocol;
this.id = id;
this.line = line;
this.column = column;
this.text = text;
this.metadata = metadata;
List<Object> badAttributes = this.attributes().stream().filter(a -> {
if (a == null) return true;
@ -82,8 +86,7 @@ public class SourceWithMetadata implements HashableWithSource {
}
if (!badAttributes.isEmpty()) {
String message = "Missing attributes in SourceWithMetadata: (" + badAttributes + ") "
+ this.toString();
String message = "Missing attributes in SourceWithMetadata: (" + badAttributes + ") " + this.toString();
throw new IncompleteSourceWithMetadataException(message);
}
@ -91,7 +94,15 @@ public class SourceWithMetadata implements HashableWithSource {
}
public SourceWithMetadata(String protocol, String id, String text) throws IncompleteSourceWithMetadataException {
this(protocol, id, 0, 0, text);
this(protocol, id, 0, 0, text, "");
}
public SourceWithMetadata(String protocol, String id, String text, String metadata) throws IncompleteSourceWithMetadataException {
this(protocol, id, 0, 0, text, metadata);
}
public SourceWithMetadata(String protocol, String id, Integer line, Integer column, String text) throws IncompleteSourceWithMetadataException {
this(protocol, id, line, column, text, "");
}
public int hashCode() {

View file

@ -60,6 +60,7 @@ public final class PipelineConfig {
private LocalDateTime readAt;
private String configHash;
private volatile String configString;
private volatile String metadata;
private List<LineToSource> sourceRefs;
private static final String NEWLINE = "\n";
@ -104,7 +105,7 @@ public final class PipelineConfig {
public String configHash() {
if (configHash == null) {
configHash = DigestUtils.sha1Hex(configString());
configHash = DigestUtils.sha1Hex(configString() + metadataString());
}
return configHash;
}
@ -129,6 +130,17 @@ public final class PipelineConfig {
return this.configString;
}
public String metadataString() {
if (this.metadata == null) {
synchronized(this) {
if (this.metadata == null) {
this.metadata = confParts.stream().map(SourceWithMetadata::getMetadata).collect(Collectors.joining());
}
}
}
return this.metadata;
}
public boolean isSystem() {
return this.settings.callMethod(RUBY.getCurrentContext(), "get_value",
RubyString.newString(RUBY, "pipeline.system"))

View file

@ -81,8 +81,8 @@ public class PipelineConfigTest extends RubyEnvTestCase {
private final StringBuilder compositeSource = new StringBuilder();
private final List<SourceWithMetadata> orderedConfigParts = new ArrayList<>();
void appendSource(final String protocol, final String id, final int line, final int column, final String text) throws IncompleteSourceWithMetadataException {
orderedConfigParts.add(new SourceWithMetadata(protocol, id, line, column, text));
void appendSource(final String protocol, final String id, final int line, final int column, final String text, final String metadata) throws IncompleteSourceWithMetadataException {
orderedConfigParts.add(new SourceWithMetadata(protocol, id, line, column, text, metadata));
if (compositeSource.length() > 0 && !compositeSource.toString().endsWith("\n")) {
compositeSource.append("\n");
@ -106,13 +106,13 @@ public class PipelineConfigTest extends RubyEnvTestCase {
pipelineIdSym = RubySymbol.newSymbol(RubyUtil.RUBY, PIPELINE_ID);
final SourceCollector sourceCollector = new SourceCollector();
sourceCollector.appendSource("file", "/tmp/1", 0, 0, "input { generator1 }\n");
sourceCollector.appendSource("file", "/tmp/2", 0, 0, "input { generator2 }");
sourceCollector.appendSource("file", "/tmp/3", 0, 0, "input { generator3 }\n");
sourceCollector.appendSource("file", "/tmp/4", 0, 0, "input { generator4 }");
sourceCollector.appendSource("file", "/tmp/5", 0, 0, "input { generator5 }\n");
sourceCollector.appendSource("file", "/tmp/6", 0, 0, "input { generator6 }");
sourceCollector.appendSource("string", "config_string", 0, 0, "input { generator1 }");
sourceCollector.appendSource("file", "/tmp/1", 0, 0, "input { generator1 }\n", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/2", 0, 0, "input { generator2 }", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/3", 0, 0, "input { generator3 }\n", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/4", 0, 0, "input { generator4 }", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/5", 0, 0, "input { generator5 }\n", "{\"version\": \"1\"}");
sourceCollector.appendSource("file", "/tmp/6", 0, 0, "input { generator6 }", "{\"version\": \"1\"}");
sourceCollector.appendSource("string", "config_string", 0, 0, "input { generator1 }", "{\"version\": \"1\"}");
orderedConfigParts = sourceCollector.orderedConfigParts();
configMerged = sourceCollector.compositeSource();
@ -129,6 +129,7 @@ public class PipelineConfigTest extends RubyEnvTestCase {
assertEquals("returns the source", source, sut.getSource());
assertEquals("returns the pipeline id", PIPELINE_ID, sut.getPipelineId());
assertNotNull("returns the config_hash", sut.configHash());
assertNotNull("returns the config_hash", sut.metadataString());
assertEquals("returns the merged `ConfigPart#config_string`", configMerged, sut.configString());
assertThat("records when the config was read", sut.getReadAt(), isBeforeOrSame(LocalDateTime.now()));
}

View file

@ -87,10 +87,11 @@ module LogStash
def get_pipeline(pipeline_id, fetcher)
config_string = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline"]
pipeline_metadata_str = (fetcher.get_single_pipeline_setting(pipeline_id)["pipeline_metadata"] || "").to_s
raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty?
config_part = org.logstash.common.SourceWithMetadata.new("x-pack-config-management", pipeline_id.to_s, config_string)
config_part = org.logstash.common.SourceWithMetadata.new("x-pack-config-management", pipeline_id.to_s, config_string, pipeline_metadata_str)
# We don't support multiple pipelines, so use the global settings from the logstash.yml file
settings = @settings.clone

View file

@ -177,6 +177,29 @@ describe "Read configuration from elasticsearch" do
end
end
it "should pick up recreated pipeline with the same config string and different metadata" do
elasticsearch_client.indices.refresh
pipeline_id = @pipelines.keys[0]
config = @pipelines.values[0]
file = File.join(@temporary_directory, pipeline_id)
Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
expect(File.exist?(file)).to be_truthy
end
cleanup_system_indices([pipeline_id])
File.delete(file)
expect(File.exist?(file)).to be_falsey
push_elasticsearch_config(pipeline_id, config, "2")
elasticsearch_client.indices.refresh
Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
expect(File.exist?(file)).to be_truthy
end
end
after :each do
@logstash_service.stop if !!@logstash_service
@elasticsearch_service.stop if !!@elasticsearch_service

View file

@ -97,13 +97,15 @@ end
def es_version
response = elasticsearch_client.perform_request(:get, "")
response.body["version"]["number"].gsub(/(\d+\.\d+)\..+/, '\1').to_f
major, minor = response.body["version"]["number"].split(".")
[major.to_i, minor.to_i]
end
def push_elasticsearch_config(pipeline_id, config)
if es_version >= 7.10
def push_elasticsearch_config(pipeline_id, config, version="1")
major, minor = es_version
if major >= 8 || (major == 7 && minor >= 10)
elasticsearch_client.perform_request(:put, "_logstash/pipeline/#{pipeline_id}", {},
{ :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => "1" },
{ :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => version },
:pipeline_settings => {"pipeline.batch.delay": "50"}, :last_modified => Time.now.utc.iso8601})
else
elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config }
@ -116,13 +118,20 @@ def cleanup_elasticsearch(index = MONITORING_INDEXES)
end
def cleanup_system_indices(pipeline_ids)
pipeline_ids.each do |id|
begin
elasticsearch_client.perform_request(:delete, "_logstash/pipeline/#{id}")
rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
puts ".logstash can be empty #{e.message}"
major, minor = es_version
if major >= 8 || (major == 7 && minor >= 10)
pipeline_ids.each do |id|
begin
elasticsearch_client.perform_request(:delete, "_logstash/pipeline/#{id}")
rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
puts ".logstash can be empty #{e.message}"
end
end
else
cleanup_elasticsearch(".logstash*")
end
elasticsearch_client.indices.refresh
end