LIR serializer refactor

Fixes #10561
This commit is contained in:
Mike Place 2019-03-14 13:14:52 -06:00 committed by Shaunak Kashyap
parent 2dbe5c1bc9
commit 30adb7a565
4 changed files with 144 additions and 120 deletions

View file

@ -6,7 +6,7 @@ require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/instrument/collector"
require "logstash/compiler"
require "monitoring/inputs/metrics/state_event/lir_serializer"
require "logstash/lir_serializer"
module LogStash; class JavaPipeline < JavaBasePipeline
include LogStash::Util::Loggable
@ -219,7 +219,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
config_metric.gauge(:dead_letter_queue_path, dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?
config_metric.gauge(:ephemeral_id, ephemeral_id)
config_metric.gauge(:hash, lir.unique_hash)
config_metric.gauge(:graph, ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer.serialize(lir))
config_metric.gauge(:graph, ::LogStash::LIRSerializer.serialize(lir))
@logger.info("Starting pipeline", default_logging_keys(
"pipeline.workers" => pipeline_workers,

View file

@ -0,0 +1,138 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
#FIXME
#require 'monitoring/inputs/metrics'
require 'logstash-core'
require 'logstash/compiler'
#FIXME
#module LogStash; module Inputs; class Metrics; class StateEvent;
module LogStash;
class LIRSerializer
attr_reader :lir_pipeline
def self.serialize(lir_pipeline)
self.new(lir_pipeline).serialize
end
def initialize(lir_pipeline)
@lir_pipeline = lir_pipeline
end
def serialize
{
"hash" => lir_pipeline.unique_hash,
"type" => "lir",
"version" => "0.0.0",
"graph" => {
"vertices" => vertices,
"edges" => edges
}
}
end
def vertices
graph.getVertices.map {|v| vertex(v) }
end
def edges
graph.getEdges.map {|e| edge(e) }
end
def graph
lir_pipeline.graph
end
def vertex(v)
hashified_vertex = case vertex_type(v)
when :plugin
plugin_vertex(v)
when :if
if_vertex(v)
when :queue
queue_vertex(v)
end
decorate_vertex(v, hashified_vertex)
end
def vertex_type(v)
if v.java_kind_of?(org.logstash.config.ir.graph.PluginVertex)
:plugin
elsif v.java_kind_of?(org.logstash.config.ir.graph.IfVertex)
:if
elsif v.java_kind_of?(org.logstash.config.ir.graph.QueueVertex)
:queue
else
raise "Unexpected vertex type! #{v}"
end
end
def decorate_vertex(v, v_json)
v_json["meta"] = format_swm(v.source_with_metadata)
v_json["id"] = v.id
v_json["explicit_id"] = !!v.explicit_id
v_json["type"] = vertex_type(v).to_s
v_json
end
def plugin_vertex(v)
pd = v.plugin_definition
{
"config_name" => pd.name,
"plugin_type" => pd.getType.to_s.downcase
}
end
def if_vertex(v)
{
"condition" => v.humanReadableExpression
}
end
def queue_vertex(v)
{}
end
def edge(e)
e_json = {
"from" => e.from.id,
"to" => e.to.id,
"id" => e.id
}
if e.java_kind_of?(org.logstash.config.ir.graph.BooleanEdge)
e_json["when"] = e.edge_type
e_json["type"] = "boolean"
else
e_json["type"] = "plain"
end
e_json
end
def format_swm(source_with_metadata)
return nil unless source_with_metadata
{
"source" => {
"protocol" => source_with_metadata.protocol,
"id" => source_with_metadata.id,
"line" => source_with_metadata.line,
"column" => source_with_metadata.column
# We omit the text of the source code for security reasons
# raw text may contain passwords
}
}
end
# def plugins
# ::Gem::Specification.
# find_all.
# select {|spec| spec.metadata && spec.metadata["logstash_plugin"] == "true"}.
# map {|spec| { :name => spec.name, :version => spec.version.to_s } }
# end
end
end

View file

@ -5,9 +5,10 @@
require 'monitoring/inputs/metrics'
require 'logstash-core'
require 'logstash/compiler'
require 'logstash/lir_serializer'
module LogStash; module Inputs; class Metrics; class StateEvent;
class LIRSerializer
class XPackLIRSerializer < LIRSerializer
attr_reader :lir_pipeline
def self.serialize(lir_pipeline)
@ -17,119 +18,4 @@ module LogStash; module Inputs; class Metrics; class StateEvent;
def initialize(lir_pipeline)
@lir_pipeline = lir_pipeline
end
def serialize
{
"hash" => lir_pipeline.unique_hash,
"type" => "lir",
"version" => "0.0.0",
"plugins" => plugins,
"graph" => {
"vertices" => vertices,
"edges" => edges
}
}
end
def vertices
graph.getVertices.map {|v| vertex(v) }
end
def edges
graph.getEdges.map {|e| edge(e) }
end
def graph
lir_pipeline.graph
end
def vertex(v)
hashified_vertex = case vertex_type(v)
when :plugin
plugin_vertex(v)
when :if
if_vertex(v)
when :queue
queue_vertex(v)
end
decorate_vertex(v, hashified_vertex)
end
def vertex_type(v)
if v.java_kind_of?(org.logstash.config.ir.graph.PluginVertex)
:plugin
elsif v.java_kind_of?(org.logstash.config.ir.graph.IfVertex)
:if
elsif v.java_kind_of?(org.logstash.config.ir.graph.QueueVertex)
:queue
else
raise "Unexpected vertex type! #{v}"
end
end
def decorate_vertex(v, v_json)
v_json["meta"] = format_swm(v.source_with_metadata)
v_json["id"] = v.id
v_json["explicit_id"] = !!v.explicit_id
v_json["type"] = vertex_type(v).to_s
v_json
end
def plugin_vertex(v)
pd = v.plugin_definition
{
"config_name" => pd.name,
"plugin_type" => pd.getType.to_s.downcase
}
end
def if_vertex(v)
{
"condition" => v.humanReadableExpression
}
end
def queue_vertex(v)
{}
end
def edge(e)
e_json = {
"from" => e.from.id,
"to" => e.to.id,
"id" => e.id
}
if e.java_kind_of?(org.logstash.config.ir.graph.BooleanEdge)
e_json["when"] = e.edge_type
e_json["type"] = "boolean"
else
e_json["type"] = "plain"
end
e_json
end
def format_swm(source_with_metadata)
return nil unless source_with_metadata
{
"source" => {
"protocol" => source_with_metadata.protocol,
"id" => source_with_metadata.id,
"line" => source_with_metadata.line,
"column" => source_with_metadata.column
# We omit the text of the source code for security reasons
# raw text may contain passwords
}
}
end
def plugins
::Gem::Specification.
find_all.
select {|spec| spec.metadata && spec.metadata["logstash_plugin"] == "true"}.
map {|spec| { :name => spec.name, :version => spec.version.to_s } }
end
end
end; end; end; end
end; end; end; end; end;

View file

@ -29,7 +29,7 @@ module LogStash; module Inputs; class Metrics;
"ephemeral_id" => pipeline.ephemeral_id,
"workers" => pipeline.settings.get("pipeline.workers"),
"batch_size" => pipeline.settings.get("pipeline.batch.size"),
"representation" => ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer.serialize(pipeline.lir)
"representation" => ::LogStash::Inputs::Metrics::StateEvent::XPackLIRSerializer.serialize(pipeline.lir)
}
end