Merge branch 'master' into experimental/petef-jruby

This commit is contained in:
Pete Fritchman 2011-02-13 17:58:42 -08:00
commit 0c776e071d
26 changed files with 826 additions and 205 deletions

View file

@ -36,6 +36,8 @@ def check_libraries
"needed for websocket output") "needed for websocket output")
results << check_lib("rack", "rack", true, results << check_lib("rack", "rack", true,
"needed for logstash-web") "needed for logstash-web")
results << check_lib("thin", "thin", true,
"needed for logstash-web")
results << check_lib("amqp", "amqp", true, results << check_lib("amqp", "amqp", true,
"needed for AMQP input and output") "needed for AMQP input and output")
results << check_lib("sinatra/async", "async_sinatra", true, results << check_lib("sinatra/async", "async_sinatra", true,
@ -46,6 +48,8 @@ def check_libraries
"improve logstash debug logging output") "improve logstash debug logging output")
results << check_lib("eventmachine", "eventmachine", false, results << check_lib("eventmachine", "eventmachine", false,
"required for logstash to function") "required for logstash to function")
results << check_lib("json", "json", false,
"required for logstash to function")
missing_required = results.count { |r| !r[:optional] and !r[:found] } missing_required = results.count { |r| !r[:optional] and !r[:found] }
if missing_required == 0 if missing_required == 0
@ -66,6 +70,8 @@ end
def main(args) def main(args)
report_ruby_version report_ruby_version
# TODO(sissel): Add a way to call out specific things to test, like
# logstash-web, elasticsearch, mongodb, syslog, etc.
if !check_libraries if !check_libraries
puts "Library check failed." puts "Library check failed."
return 1 return 1

View file

@ -2,4 +2,5 @@ module LogStash
module Inputs; end module Inputs; end
module Outputs; end module Outputs; end
module Filters; end module Filters; end
module Search; end
end # module LogStash end # module LogStash

View file

@ -41,6 +41,9 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
}, # "settings" }, # "settings"
} # ES Index } # ES Index
#puts :waiting
puts @esurl.to_s
#sleep 10
indexurl = @esurl.to_s indexurl = @esurl.to_s
indexmap_http = EventMachine::HttpRequest.new(indexurl) indexmap_http = EventMachine::HttpRequest.new(indexurl)
indexmap_req = indexmap_http.put :body => indexmap.to_json indexmap_req = indexmap_http.put :body => indexmap.to_json
@ -49,8 +52,12 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
ready(params) ready(params)
end end
indexmap_req.errback do indexmap_req.errback do
@logger.warn(["Failure configuring index", @esurl.to_s, indexmap]) @logger.warn(["Failure configuring index (http failed to connect?)",
@esurl.to_s, indexmap])
@logger.warn([indexmap_req])
#sleep 30
raise "Failure configuring index: #{@esurl.to_s}" raise "Failure configuring index: #{@esurl.to_s}"
end end
end # def register end # def register

View file

@ -0,0 +1,39 @@
require "logstash/namespace"
require "logstash/logging"
require "logstash/event"
class LogStash::Search::Base
# Do a search.
#
# This method is async. You can expect a block and therefore
# should yield a result, not return one.
#
# Implementations should yield a LogStash::Search::Result
# LogStash::Search::Result#events must be an array of LogStash::Event
def search(query)
raise "The class #{self.class.name} must implement the 'search' method."
end # def search
# Yields a histogram by field of a query.
#
# This method is async. You should expect a block to be passed and therefore
# should yield a result, not return one.
#
# Implementations should yield a LogStash::Search::FacetResult::Histogram
def histogram(query, field, interval=nil)
raise "The class #{self.class.name} must implement the 'histogram' method."
end
# Returns a list of popular terms from a query
# TODO(sissel): Implement
def popular_terms(query, fields, count=10)
raise "The class #{self.class.name} must implement the 'popular_terms' method."
end
# Count the results given by a query.
def count(query)
raise "The class #{self.class.name} must implement the 'count' method."
end
end # class LogStash::Search::Base

View file

@ -0,0 +1,196 @@
require "em-http-request"
require "logstash/namespace"
require "logstash/logging"
require "logstash/event"
require "logstash/search/base"
require "logstash/search/query"
require "logstash/search/result"
require "logstash/search/facetresult"
require "logstash/search/facetresult/histogram"
class LogStash::Search::ElasticSearch < LogStash::Search::Base
public
def initialize(settings={})
@host = (settings[:host] || "localhost")
@port = (settings[:port] || 9200).to_i
@logger = LogStash::Logger.new(STDOUT)
end
# See LogStash::Search;:Base#search
public
def search(query)
raise "No block given for search call." if !block_given?
if query.is_a?(String)
query = LogStash::Search::Query.parse(query)
end
# TODO(sissel): only search a specific index?
http = EventMachine::HttpRequest.new("http://#{@host}:#{@port}/_search")
@logger.info(["Query", query])
esreq = {
"sort" => [
{ "@timestamp" => "desc" }
],
"query" => {
"query_string" => {
"query" => query.query_string,
"default_operator" => "AND"
} # query_string
}, # query
"from" => query.offset,
"size" => query.count
} # elasticsearch request
@logger.info("ElasticSearch Query: #{esreq.to_json}")
start_time = Time.now
req = http.get :body => esreq.to_json
result = LogStash::Search::Result.new
req.callback do
data = JSON.parse(req.response)
result.duration = Time.now - start_time
hits = data["hits"]["hits"] rescue nil
if hits.nil? or !data["error"].nil?
# Use the error message if any, otherwise, return the whole
# data object as json as the error message for debugging later.
result.error_message = (data["error"] rescue false) || data.to_json
yield result
next
end
@logger.info(["Got search results",
{ :query => query.query_string, :duration => data["duration"],
:result_count => hits.size }])
if req.response_header.status != 200
result.error_message = data["error"] || req.inspect
@error = data["error"] || req.inspect
end
# We want to yield a list of LogStash::Event objects.
hits.each do |hit|
result.events << LogStash::Event.new(hit["_source"])
end
# Total hits this search could find if not limited
result.total = data["hits"]["total"]
result.offset = query.offset
yield result
end
req.errback do
@logger.warn(["Query failed", query, req, req.response])
result.duration = Time.now - start_time
result.error_message = req.response
#yield result
yield({ "error" => req.response })
end
end # def search
# See LogStash::Search;:Base#histogram
public
def histogram(query, field, interval=nil)
if query.is_a?(String)
query = LogStash::Search::Query.parse(query)
end
# TODO(sissel): only search a specific index?
http = EventMachine::HttpRequest.new("http://#{@host}:#{@port}/_search")
@logger.info(["Query", query])
histogram_settings = {
"field" => field
}
if !interval.nil? && interval.is_a?(Numeric)
histogram_settings["interval"] = interval
end
esreq = {
"query" => {
"query_string" => {
"query" => query.query_string,
"default_operator" => "AND"
} # query_string
}, # query
"from" => 0,
"size" => 0,
"facets" => {
"amazingpants" => { # just a name for this histogram...
"histogram" => histogram_settings,
},
},
} # elasticsearch request
@logger.info("ElasticSearch Facet Query: #{esreq.to_json}")
start_time = Time.now
req = http.get :body => esreq.to_json
result = LogStash::Search::FacetResult.new
req.callback do
data = JSON.parse(req.response)
result.duration = Time.now - start_time
@logger.info(["Got search results",
{ :query => query.query_string, :duration => data["duration"] }])
if req.response_header.status != 200
result.error_message = data["error"] || req.inspect
@error = data["error"] || req.inspect
end
entries = data["facets"]["amazingpants"]["entries"] rescue nil
if entries.nil? or !data["error"].nil?
# Use the error message if any, otherwise, return the whole
# data object as json as the error message for debugging later.
result.error_message = (data["error"] rescue false) || data.to_json
yield result
next
end
entries.each do |entry|
# entry is a hash of keys 'total', 'mean', 'count', and 'key'
hist_entry = LogStash::Search::FacetResult::Histogram.new
hist_entry.key = entry["key"]
hist_entry.count = entry["count"]
result.results << hist_entry
end # for each histogram result
yield result
end # request callback
req.errback do
@logger.warn(["Query failed", query, req, req.response])
result.duration = Time.now - start_time
result.error_message = req.response
yield result
#yield({ "error" => req.response })
end
end
# Not used. Needs refactoring elsewhere.
private
def __anonymize
# TODO(sissel): Plugin-ify this (Search filters!)
# TODO(sissel): Implement
# Search anonymization
#require "digest/md5"
#data["hits"]["hits"].each do |hit|
[].each do |hit|
event = LogStash::Event.new(hit["_source"])
event.to_hash.each do |key, value|
next unless value.is_a?(String)
value.gsub!(/[^ ]+\.loggly\.net/) { |match| "loggly-" + Digest::MD5.hexdigest(match)[0..6] + ".example.com"}
end
event.fields.each do |key, value|
value = [value] if value.is_a?(String)
next unless value.is_a?(Array)
value.each do |v|
v.gsub!(/[^ ]+\.loggly\.net/) { |match| "loggly-" + Digest::MD5.hexdigest(match)[0..6] + ".example.com"}
end # value.each
end # hit._source.@fields.each
end # data.hits.hits.each
end # def __anonymize
end # class LogStash::Search::ElasticSearch

View file

@ -0,0 +1,25 @@
require "logstash/namespace"
require "logstash/logging"
class LogStash::Search::FacetResult
# Array of LogStash::Search::FacetResult::Entry
attr_accessor :results
# How long this query took, in seconds (or fractions of).
attr_accessor :duration
# Error message, if any.
attr_accessor :error_message
def initialize(settings={})
@results = []
@duration = nil
@error_message = nil
end
def error?
return !@error_message.nil?
end
end # class LogStash::Search::FacetResult

View file

@ -0,0 +1,6 @@
require "logstash/search/facetresult"
class LogStash::Search::FacetResult::Entry
# nothing here
end # class LogStash::Search::FacetResult::Entry

View file

@ -0,0 +1,21 @@
require "json"
require "logstash/search/facetresult/entry"
class LogStash::Search::FacetResult::Histogram < LogStash::Search::FacetResult::Entry
# The name or key for this result.
attr_accessor :key
attr_accessor :mean
attr_accessor :total
attr_accessor :count
# sometimes a parent call to to_json calls us with args?
def to_json(*args)
return {
"key" => @key,
"mean" => @mean,
"total" => @total,
"count" => @count,
}.to_json
end
end

View file

@ -0,0 +1,35 @@
require "logstash/namespace"
require "logstash/logging"
class LogStash::Search::Query
# The query string
attr_accessor :query_string
# The offset to start at (like SQL's SELECT ... OFFSET n)
attr_accessor :offset
# The max number of results to return. (like SQL's SELECT ... LIMIT n)
attr_accessor :count
# New query object.
#
# 'settings' should be a hash containing:
#
# * :query_string - a string query for searching
# * :offset - (optional, default 0) offset to search from
# * :count - (optional, default 50) max number of results to return
def initialize(settings)
@query_string = settings[:query_string]
@offset = settings[:offset] || 0
@count = settings[:count] || 50
end
# Class method. Parses a query string and returns
# a LogStash::Search::Query instance
def self.parse(query_string)
# TODO(sissel): I would prefer not to invent my own query language.
# Can we be similar to Lucene, SQL, or other query languages?
return self.new(:query_string => query_string)
end
end # class LogStash::Search::Query

View file

@ -0,0 +1,39 @@
require "logstash/namespace"
require "logstash/logging"
class LogStash::Search::Result
# Array of LogStash::Event of results
attr_accessor :events
# How long this query took, in seconds (or fractions of).
attr_accessor :duration
# Offset in search
attr_accessor :offset
# Total records matched by this query, regardless of offset/count in query.
attr_accessor :total
# Error message, if any.
attr_accessor :error_message
def initialize(settings={})
@events = []
@duration = nil
@error_message = nil
end
def error?
return !@error_message.nil?
end
def to_json
return {
"events" => @events,
"duration" => @duration,
"offset" => @offset,
"total" => @total,
}.to_json
end # def to_json
end # class LogStash::Search::Result

View file

@ -0,0 +1,90 @@
require "em-http-request"
require "logstash/namespace"
require "logstash/logging"
require "logstash/event"
require "logstash/search/base"
require "logstash/search/query"
require "logstash/search/result"
require "logstash/search/facetresult"
require "logstash/search/facetresult/histogram"
class LogStash::Search::Twitter < LogStash::Search::Base
public
def initialize(settings={})
@host = (settings[:host] || "search.twitter.com")
@port = (settings[:port] || 80).to_i
@logger = LogStash::Logger.new(STDOUT)
end
public
def search(query)
raise "No block given for search call." if !block_given?
if query.is_a?(String)
query = LogStash::Search::Query.parse(query)
end
# TODO(sissel): only search a specific index?
http = EventMachine::HttpRequest.new("http://#{@host}:#{@port}/search.json?q=#{URI.escape(query.query_string)}&rpp=#{URI.escape(query.count) rescue query.count}")
@logger.info(["Query", query])
start_time = Time.now
req = http.get
result = LogStash::Search::Result.new
req.callback do
data = JSON.parse(req.response)
result.duration = Time.now - start_time
hits = (data["results"] || nil) rescue nil
if hits.nil? or !data["error"].nil?
# Use the error message if any, otherwise, return the whole
# data object as json as the error message for debugging later.
result.error_message = (data["error"] rescue false) || data.to_json
yield result
next
end
hits.each do |hit|
hit["@message"] = hit["text"]
hit["@timestamp"] = hit["created_at"]
hit.delete("text")
end
@logger.info(["Got search results",
{ :query => query.query_string, :duration => data["duration"],
:result_count => hits.size }])
if req.response_header.status != 200
result.error_message = data["error"] || req.inspect
@error = data["error"] || req.inspect
end
# We want to yield a list of LogStash::Event objects.
hits.each do |hit|
result.events << LogStash::Event.new(hit)
end
# Total hits this search could find if not limited
result.total = hits.size
result.offset = 0
yield result
end
req.errback do
@logger.warn(["Query failed", query, req, req.response])
result.duration = Time.now - start_time
result.error_message = req.response
yield result
end
end # def search
def histogram(query, field, interval=nil)
# Nothing to histogram.
result = LogStash::Search::FacetResult.new
yield result
end
end # class LogStash::Search::ElasticSearch

View file

@ -0,0 +1,17 @@
require "sinatra/base"
module Sinatra
module RequireParam
def require_param(*fields)
missing = []
fields.each do |field|
if params[field].nil?
missing << field
end
end
return missing
end # def require_param
end # module RequireParam
helpers RequireParam
end # module Sinatra

View file

@ -1,86 +0,0 @@
require "em-http-request"
require "logstash/namespace"
require "logstash/logging"
require "logstash/event"
module LogStash::Web; end
class LogStash::Web::ElasticSearch
public
def initialize(settings)
@port = (settings[:port] || 9200).to_i
@logger = LogStash::Logger.new(STDOUT)
end
public
def search(params)
http = EventMachine::HttpRequest.new("http://localhost:#{@port}/_search")
params[:offset] ||= 0
params[:count] ||= 20
@logger.info(["Query", params])
esreq = {
"sort" => [
{ "@timestamp" => "desc" }
],
"query" => {
"query_string" => {
"query" => params[:q],
"default_operator" => "AND"
} # query_string
}, # query
"facets" => {
"by_hour" => {
"histogram" => {
"field" => "@timestamp",
"time_interval" => "1h",
}, # histogram
}, # by_hour
}, # facets
"from" => params[:offset],
"size" => params[:count],
}
@logger.info("ElasticSearch Query: #{esreq.to_json}")
start_time = Time.now
req = http.get :body => esreq.to_json
req.callback do
#headers req.response_header
data = JSON.parse(req.response)
data["duration"] = Time.now - start_time
# TODO(sissel): Plugin-ify this (Search filters!)
# Search anonymization
#require "digest/md5"
#data["hits"]["hits"].each do |hit|
[].each do |hit|
event = LogStash::Event.new(hit["_source"])
event.to_hash.each do |key, value|
next unless value.is_a?(String)
value.gsub!(/[^ ]+\.loggly\.net/) { |match| "loggly-" + Digest::MD5.hexdigest(match)[0..6] + ".example.com"}
end
event.fields.each do |key, value|
value = [value] if value.is_a?(String)
next unless value.is_a?(Array)
value.each do |v|
v.gsub!(/[^ ]+\.loggly\.net/) { |match| "loggly-" + Digest::MD5.hexdigest(match)[0..6] + ".example.com"}
end # value.each
end # hit._source.@fields.each
end # data.hits.hits.each
@logger.info(["Got search results",
{ :query => params[:q], :duration => data["duration"]}])
#@logger.info(data)
if req.response_header.status != 200
@error = data["error"] || req.inspect
end
yield data
end
req.errback do
@logger.warn(["Query failed", params, req, req.response])
yield({ "error" => req.response })
end
end # def search
end # class LogStash::Web::ElasticSearch

View file

@ -1,4 +1,6 @@
(function() { (function() {
// TODO(sissel): Write something that will use history.pushState and fall back
// to document.location.hash madness.
var logstash = { var logstash = {
params: { params: {
@ -6,21 +8,78 @@
count: 50, count: 50,
}, },
search: function(query) { search: function(query, options) {
if (query == undefined || query == "") { if (query == undefined || query == "") {
return; return;
} }
//console.log("Searching: " + query);
/* Default options */
if (typeof(options) == 'undefined') {
options = { graph: true };
}
var display_query = query.replace("<", "&lt;").replace(">", "&gt;") var display_query = query.replace("<", "&lt;").replace(">", "&gt;")
$("#querystatus").html("Loading query '" + display_query + "'") $("#querystatus, #results h1").html("Loading query '" + display_query + "' (offset:" + logstash.params.offset + ", count:" + logstash.params.count + ") <img class='throbber' src='/media/construction.gif'>")
//console.log(logstash.params) //console.log(logstash.params)
logstash.params.q = query; logstash.params.q = query;
document.location.hash = escape(JSON.stringify(logstash.params)); document.location.hash = escape(JSON.stringify(logstash.params));
$("#results").load("/search/ajax", logstash.params);
/* Load the search results */
$("#results").load("/api/search?format=html", logstash.params);
if (options.graph != false) {
/* Load the default histogram graph */
logstash.params.interval = 3600000; /* 1 hour, default */
logstash.histogram();
} /* if options.graph != false */
$("#query").val(logstash.params.q); $("#query").val(logstash.params.q);
}, /* search */ }, /* search */
histogram: function(tries) {
if (typeof(tries) == 'undefined') {
tries = 7;
}
/* GeoCities mode on the graph while waiting ...
* This won't likely survive 1.0, but it's fun for now... */
$("#visual").html("<center><img src='/media/truckconstruction.gif'><center>");
jQuery.getJSON("/api/histogram", logstash.params, function(histogram, text, jqxhr) {
/* Load the data into the graph */
var flot_data = [];
// histogram is an array of { "key": ..., "count": ... }
for (var i in histogram) {
flot_data.push([parseInt(histogram[i]["key"]), histogram[i]["count"]])
}
//console.log(histogram);
/* Try to be intelligent about how we choose the histogram interval.
* If there are too few data points, try a smaller interval.
* If there are too many data points, try a larger interval.
* Give up after a few tries and go with the last result.
*
* This queries the backend several times, but should be reasonably
* speedy as this behaves roughly as a binary search. */
if (flot_data.length < 6 && flot_data.length > 0 && tries > 0) {
//console.log("Histogram bucket " + logstash.params.interval + " has only " + flot_data.length + " data points, trying smaller...");
logstash.params.interval /= 2;
if (logstash.params.interval < 1000) {
tries = 0; /* stop trying, too small... */
logstash.plot(flot_data, logstash.params.interval);
return;
}
logstash.histogram(tries - 1);
} else if (flot_data.length > 50 && tries > 0) {
//console.log("Histogram bucket " + logstash.params.interval + " too many (" + flot_data.length + ") data points, trying larger interval...");
logstash.params.interval *= 2;
logstash.histogram(tries - 1);
} else {
//console.log("Histo:" + logstash.params.interval);
logstash.plot(flot_data, logstash.params.interval);
}
});
},
parse_params: function(href) { parse_params: function(href) {
var query = href.replace(/^[^?]*\?/, ""); var query = href.replace(/^[^?]*\?/, "");
if (query == href) { if (query == href) {
@ -48,14 +107,15 @@
logstash.search(newquery.trim()); logstash.search(newquery.trim());
}, /* appendquery */ }, /* appendquery */
plot: function(data) { plot: function(data, interval) {
var target = $("#visual"); var target = $("#visual");
target.css("display", "block");
var plot = $.plot(target, var plot = $.plot(target,
[ { /* data */ [ { /* data */
data: data, data: data,
bars: { bars: {
show: true, show: true,
barWidth: 3600000, barWidth: interval,
} }
} ], } ],
{ /* options */ { /* options */
@ -67,8 +127,12 @@
target.bind("plotclick", function(e, pos, item) { target.bind("plotclick", function(e, pos, item) {
if (item) { if (item) {
start = logstash.ms_to_iso8601(item.datapoint[0]); start = logstash.ms_to_iso8601(item.datapoint[0]);
end = logstash.ms_to_iso8601(item.datapoint[0] + 3600000); end = logstash.ms_to_iso8601(item.datapoint[0] + interval);
/* Clicking on the graph means a new search, means
* we probably don't want to keep the old offset since
* the search results will change. */
logstash.params.offset = 0;
logstash.appendquery("@timestamp:[" + start + " TO " + end + "]"); logstash.appendquery("@timestamp:[" + start + " TO " + end + "]");
} }
}); });
@ -125,13 +189,16 @@
for (var p in params) { for (var p in params) {
logstash.params[p] = params[p]; logstash.params[p] = params[p];
} }
logstash.search(logstash.params.q) logstash.search(logstash.params.q, { graph: false })
return false; return false;
}); });
var result_row_selector = "table.results tr.event"; var result_row_selector = "table.results tr.event";
$(result_row_selector).live("click", function() { $(result_row_selector).live("click", function() {
var data = eval($("td.message", this).data("full")); var data = $("td.message", this).data("full");
if (typeof(data) == "string") {
data = JSON.parse(data);
}
/* Apply template to the dialog */ /* Apply template to the dialog */
var query = $("#query").val().replace(/^\s+|\s+$/g, "") var query = $("#query").val().replace(/^\s+|\s+$/g, "")
@ -155,8 +222,8 @@
/* TODO(sissel): recurse through the data */ /* TODO(sissel): recurse through the data */
var fields = new Array(); var fields = new Array();
for (var i in data._source["@fields"]) { for (var i in data["@fields"]) {
var value = data._source["@fields"][i] var value = data["@fields"][i]
if (/^[, ]*$/.test(value)) { if (/^[, ]*$/.test(value)) {
continue; /* Skip empty data fields */ continue; /* Skip empty data fields */
} }
@ -166,9 +233,9 @@
fields.push( { type: "field", field: i, value: value }) fields.push( { type: "field", field: i, value: value })
} }
for (var i in data._source) { for (var i in data) {
if (i == "@fields") continue; if (i == "@fields") continue;
var value = data._source[i] var value = data[i]
if (!(value instanceof Array)) { if (!(value instanceof Array)) {
value = [value]; value = [value];
} }

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

View file

@ -1,4 +1,7 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# I don't want folks to have to learn to use yet another tool (rackup)
# just to launch logstash-web. So let's work like a standard ruby
# executable.
##rackup -Ilib:../lib -s thin ##rackup -Ilib:../lib -s thin
$:.unshift("%s/../lib" % File.dirname(__FILE__)) $:.unshift("%s/../lib" % File.dirname(__FILE__))
@ -6,22 +9,49 @@ $:.unshift(File.dirname(__FILE__))
require "eventmachine" require "eventmachine"
require "json" require "json"
require "lib/elasticsearch" require "logstash/search/elasticsearch"
require "logstash/search/query"
require "logstash/namespace" require "logstash/namespace"
require "rack" require "rack"
require "rubygems" require "rubygems"
require "sinatra/async" require "sinatra/async"
require "logstash/web/helpers/require_param"
class EventMachine::ConnectionError < RuntimeError; end class EventMachine::ConnectionError < RuntimeError; end
module LogStash::Web; end
class LogStash::Web::Server < Sinatra::Base class LogStash::Web::Server < Sinatra::Base
register Sinatra::Async register Sinatra::Async
helpers Sinatra::RequireParam # logstash/web/helpers/require_param
set :haml, :format => :html5 set :haml, :format => :html5
set :logging, true set :logging, true
set :public, "#{File.dirname(__FILE__)}/public" set :public, "#{File.dirname(__FILE__)}/public"
set :views, "#{File.dirname(__FILE__)}/views" set :views, "#{File.dirname(__FILE__)}/views"
elasticsearch = LogStash::Web::ElasticSearch.new
use Rack::CommonLogger
#use Rack::ShowExceptions
def initialize(settings={})
super
# TODO(sissel): Support alternate backends
backend_url = URI.parse(settings.backend_url)
case backend_url.scheme
when "elasticsearch"
@backend = LogStash::Search::ElasticSearch.new(
:host => backend_url.host,
:port => backend_url.port
)
when "twitter"
require "logstash/search/twitter"
@backend = LogStash::Search::Twitter.new(
:host => backend_url.host,
:port => backend_url.port
)
end # backend_url.scheme
end # def initialize
aget '/style.css' do aget '/style.css' do
headers "Content-Type" => "text/css; charset=utf8" headers "Content-Type" => "text/css; charset=utf8"
body sass :style body sass :style
@ -32,8 +62,11 @@ class LogStash::Web::Server < Sinatra::Base
end # '/' end # '/'
aget '/search' do aget '/search' do
result_callback = proc do result_callback = proc do |results|
status 500 if @error status 500 if @error
@results = results
p :got => results
params[:format] ||= "html" params[:format] ||= "html"
case params[:format] case params[:format]
@ -48,10 +81,10 @@ class LogStash::Web::Server < Sinatra::Base
body erb :"search/results.txt", :layout => false body erb :"search/results.txt", :layout => false
when "json" when "json"
headers({"Content-Type" => "text/plain" }) headers({"Content-Type" => "text/plain" })
# TODO(sissel): issue/30 - needs refactoring here.
hits = @hits.collect { |h| h["_source"] } hits = @hits.collect { |h| h["_source"] }
response = { response = {
"hits" => hits, "hits" => hits,
"facets" => (@results["facets"] rescue nil),
} }
response["error"] = @error if @error response["error"] = @error if @error
@ -63,43 +96,79 @@ class LogStash::Web::Server < Sinatra::Base
# have javascript enabled, we need to show the results in # have javascript enabled, we need to show the results in
# case a user doesn't have javascript. # case a user doesn't have javascript.
if params[:q] and params[:q] != "" if params[:q] and params[:q] != ""
elasticsearch.search(params) do |results| query = LogStash::Search::Query.new(
@results = results :query_string => params[:q],
@hits = (@results["hits"]["hits"] rescue []) :offset => params[:offset],
:count => params[:count]
)
@backend.search(query) do |results|
p :got => results
begin begin
result_callback.call result_callback.call results
rescue => e rescue => e
puts e p :exception => e
end end
end # elasticsearch.search end # @backend.search
else else
#@error = "No query given." results = LogStash::Search::Result.new(
@hits = [] :events => [],
result_callback.call :error_message => "No query given"
)
result_callback.call results
end end
end # aget '/search' end # aget '/search'
apost '/search/ajax' do apost '/api/search' do
api_search
end # apost /api/search
aget '/api/search' do
api_search
end # aget /api/search
def api_search
headers({"Content-Type" => "text/html" }) headers({"Content-Type" => "text/html" })
count = params["count"] = (params["count"] or 50).to_i count = params["count"] = (params["count"] or 50).to_i
offset = params["offset"] = (params["offset"] or 0).to_i offset = params["offset"] = (params["offset"] or 0).to_i
elasticsearch.search(params) do |results| format = (params[:format] or "json")
query = LogStash::Search::Query.new(
:query_string => params[:q],
:offset => offset,
:count => count
)
@backend.search(query) do |results|
@results = results @results = results
if @results.include?("error") if @results.error?
body haml :"search/error", :layout => !request.xhr? status 500
case format
when "html"
headers({"Content-Type" => "text/html" })
body haml :"search/error", :layout => !request.xhr?
when "text"
headers({"Content-Type" => "text/plain" })
body erb :"search/error.txt", :layout => false
when "txt"
headers({"Content-Type" => "text/plain" })
body erb :"search/error.txt", :layout => false
when "json"
headers({"Content-Type" => "text/plain" })
# TODO(sissel): issue/30 - needs refactoring here.
if @results.error?
body({ "error" => @results.error_message }.to_json)
else
body @results.to_json
end
end # case params[:format]
next next
end end
@hits = (@results["hits"]["hits"] rescue []) @events = @results.events
@total = (@results["hits"]["total"] rescue 0) @total = (@results.total rescue 0)
@graphpoints = [] count = @results.events.size
begin
@results["facets"]["by_hour"]["entries"].each do |entry|
@graphpoints << [entry["key"], entry["count"]]
end
rescue => e
puts e
end
if count and offset if count and offset
if @total > (count + offset) if @total > (count + offset)
@ -115,7 +184,7 @@ class LogStash::Web::Server < Sinatra::Base
next_params["offset"] = [offset + count, @total - count].min next_params["offset"] = [offset + count, @total - count].min
@next_href = "?" + next_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&") @next_href = "?" + next_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&")
last_params = next_params.clone last_params = next_params.clone
last_params["offset"] = @total - offset last_params["offset"] = @total - count
@last_href = "?" + last_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&") @last_href = "?" + last_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&")
end end
@ -124,24 +193,83 @@ class LogStash::Web::Server < Sinatra::Base
prev_params["offset"] = [offset - count, 0].max prev_params["offset"] = [offset - count, 0].max
@prev_href = "?" + prev_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&") @prev_href = "?" + prev_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&")
if prev_params["offset"] > 0 #if prev_params["offset"] > 0
first_params = prev_params.clone first_params = prev_params.clone
first_params["offset"] = 0 first_params["offset"] = 0
@first_href = "?" + first_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&") @first_href = "?" + first_params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&")
end #end
end end
body haml :"search/ajax", :layout => !request.xhr? # TODO(sissel): make a helper function taht goes hash -> cgi querystring
end # elasticsearch.search @refresh_href = "?" + params.collect { |k,v| [URI.escape(k.to_s), URI.escape(v.to_s)].join("=") }.join("&")
end # apost '/search/ajax'
case format
when "html"
headers({"Content-Type" => "text/html" })
body haml :"search/ajax", :layout => !request.xhr?
when "text"
headers({"Content-Type" => "text/plain" })
body erb :"search/results.txt", :layout => false
when "txt"
headers({"Content-Type" => "text/plain" })
body erb :"search/results.txt", :layout => false
when "json"
headers({"Content-Type" => "text/plain" })
# TODO(sissel): issue/30 - needs refactoring here.
response = @results
body response.to_json
end # case params[:format]
end # @backend.search
end # def api_search
aget '/api/histogram' do
headers({"Content-Type" => "text/plain" })
missing = require_param(:q)
if !missing.empty?
status 500
body({ "error" => "Missing requiremed parameters",
"missing" => missing }.to_json)
next
end # if !missing.empty?
format = (params[:format] or "json") # default json
field = (params[:field] or "@timestamp") # default @timestamp
interval = (params[:interval] or 3600000).to_i # default 1 hour
@backend.histogram(params[:q], field, interval) do |results|
@results = results
if @results.error?
status 500
body({ "error" => @results.error_message }.to_json)
next
end
begin
a = results.results.to_json
rescue => e
status 500
body e.inspect
p :exception => e
p e
raise e
end
status 200
body a
end # @backend.search
end # aget '/api/histogram'
aget '/*' do
status 404 if @error
body "Invalid path."
end # aget /*
end # class LogStash::Web::Server end # class LogStash::Web::Server
require "optparse" require "optparse"
Settings = Struct.new(:daemonize, :logfile, :address, :port) Settings = Struct.new(:daemonize, :logfile, :address, :port, :backend_url)
settings = Settings.new settings = Settings.new
settings.address = "0.0.0.0" settings.address = "0.0.0.0"
settings.port = 9292 settings.port = 9292
settings.backend_url = "elasticsearch://localhost:9200/"
progname = File.basename($0) progname = File.basename($0)
@ -163,6 +291,11 @@ opts = OptionParser.new do |opts|
opts.on("-p", "--port PORT", "Port on which to start webserver. Default is 9292.") do |port| opts.on("-p", "--port PORT", "Port on which to start webserver. Default is 9292.") do |port|
settings.port = port.to_i settings.port = port.to_i
end end
opts.on("-b", "--backend URL",
"The backend URL to use. Default is elasticserach://localhost:9200/") do |url|
settings.backend_url = url
end
end end
opts.parse! opts.parse!
@ -189,5 +322,5 @@ end
Rack::Handler::Thin.run( Rack::Handler::Thin.run(
Rack::CommonLogger.new( \ Rack::CommonLogger.new( \
Rack::ShowExceptions.new( \ Rack::ShowExceptions.new( \
LogStash::Web::Server.new)), LogStash::Web::Server.new(settings))),
:Port => settings.port, :Host => settings.address) :Port => settings.port, :Host => settings.address)

View file

@ -4,7 +4,7 @@
%title= @title || "logstash" %title= @title || "logstash"
%link{ :rel => "stylesheet", :href => "/style.css", :type => "text/css" } %link{ :rel => "stylesheet", :href => "/style.css", :type => "text/css" }
%link{ :rel => "stylesheet", :href => "/css/smoothness/jquery-ui-1.8.5.custom.css", :type => "text/css" } %link{ :rel => "stylesheet", :href => "/css/smoothness/jquery-ui-1.8.5.custom.css", :type => "text/css" }
%script{ :src => "https://ajax.googleapis.com/ajax/libs/jquery/1.4.3/jquery.min.js", %script{ :src => "https://ajax.googleapis.com/ajax/libs/jquery/1.5.0/jquery.min.js",
:type => "text/javascript" } :type => "text/javascript" }
%body %body
#header #header

View file

@ -2,34 +2,39 @@
- if (params[:q].strip.length > 0 rescue false) - if (params[:q].strip.length > 0 rescue false)
%h1 %h1
Search results for '#{params[:q]}' Search results for '#{params[:q]}'
- if @graphpoints
#visual
:javascript
$(function() {
var graphdata = #{@graphpoints.to_json};
window.logstash.plot(graphdata);
});
- if @total and @result_start and @result_end - if @total and @result_start and @result_end
%small %small
%strong %strong
Results #{@result_start} - #{@result_end} of #{@total} Results #{@result_start} - #{@result_end} of #{@results.total}
| |
- if @first_href - if @first_href
%a.pager{ :href => @first_href } first %a.pager{ :href => @first_href } first
| - else
%span.unavailable first
|
- if @prev_href - if @prev_href
%a.pager{ :href => @prev_href } %a.pager{ :href => @prev_href }
prev prev
- if @next_href - else
| %span.unavailable prev
|
- if @next_href - if @next_href
%a.pager{ :href => @next_href } %a.pager{ :href => @next_href }
next next
- else
%span.unavailable next
|
- if @last_href - if @last_href
|
%a.pager{ :href => @last_href } %a.pager{ :href => @last_href }
last last
- if @hits.length == 0 - else
%span.unavailable last
|
%a.pager{ :href => @refresh_href }
refresh
|
%span#querytime= "(%.3f seconds)" % @results.duration
- if @results.events.length == 0
- if !params[:q] - if !params[:q]
/ We default to a '+2 days' in the future to capture 'today at 00:00' / We default to a '+2 days' in the future to capture 'today at 00:00'
/ plus tomorrow, inclusive, in case you are 23 hours behind the international / plus tomorrow, inclusive, in case you are 23 hours behind the international
@ -42,8 +47,9 @@
%tr %tr
%th timestamp %th timestamp
%th event %th event
- @hits.reverse.each do |hit| - @results.events.reverse.each do |event|
%tr.event %tr.event
%td.timestamp&= hit["_source"]["@timestamp"] %td.timestamp&= event.timestamp
%td.message{ :"data-full" => hit.to_json } %td.message{ :"data-full" => event.to_json }
%pre&= hit["_source"]["@message"] %a{:href => "#"}
%pre&= event.message

View file

@ -1,3 +1,3 @@
#error #error
%h4 The query '#{params["q"]}' resulted the following error: %h4 The query '#{params["q"]}' resulted the following error:
%pre&= @results["error"] %pre&= @results.error_message

View file

@ -0,0 +1,4 @@
An error occured in query '<%= params[:q] %>'
ERROR:
<%= @results.error_message %>

View file

@ -14,4 +14,7 @@
for that event. You can also click on the graph to zoom to that time period. for that event. You can also click on the graph to zoom to that time period.
The query language is that of Lucene's string query (<a href="http://lucene.apache.org/java/2_4_0/queryparsersyntax.html">docs</a>). The query language is that of Lucene's string query (<a href="http://lucene.apache.org/java/2_4_0/queryparsersyntax.html">docs</a>).
#visual
=haml :"search/ajax", :layout => false =haml :"search/ajax", :layout => false

View file

@ -1,9 +1,8 @@
<% <%
# Sinatra currently doesn't do ERB with newline trimming, so we # Sinatra currently doesn't do ERB with newline trimming, so we
# have to write this funky mishmosh that is hard to read. # have to write this funky mishmosh on one line that is hard to read.
if @error %>Error: <%= @error %><% else if @results.error? %>Error: <%= @results.error_message%><% else
@hits.each do |hit| @results.events.each do |event|
event = LogStash::Event.new(hit["_source"])
%><%= event.message || event.to_hash.to_json %> %><%= event.message || event.to_hash.to_json %>
<% end <% end
end end

View file

@ -29,6 +29,9 @@ body
pre pre
white-space: pre-wrap white-space: pre-wrap
margin: 0 margin: 0
a
text-decoration: none
color: black
#content td.timestamp #content td.timestamp
white-space: nowrap white-space: nowrap
padding: 1px padding: 1px
@ -54,8 +57,11 @@ body
margin: 0 margin: 0
#inspector #inspector
font-size: 70% font-size: 70%
#results #visual #visual
width: 850px width: 850px
height: 200px height: 200px
display: none
#results h1 #results h1
font-size: 100% font-size: 100%
img.throbber
vertical-align: top

View file

@ -5,7 +5,8 @@ $:.unshift File.dirname(__FILE__) + "/../../"
require "logstash/testcase" require "logstash/testcase"
require "logstash/agent" require "logstash/agent"
require "logstash/logging" require "logstash/logging"
require "logstash/web/lib/elasticsearch" require "logstash/search/elasticsearch"
require "logstash/search/query"
# For checking elasticsearch health # For checking elasticsearch health
require "net/http" require "net/http"
@ -85,54 +86,60 @@ class TestOutputElasticSearch < LogStash::TestCase
EventMachine::run do EventMachine::run do
em_setup em_setup
events = [] # TODO(sissel): I think em-http-request may cross signals somehow
myfile = File.basename(__FILE__) # if there are multiple requests to the same host/port?
1.upto(5).each do |i| # Confusing. If we don't sleep here, then the setup fails and blows
events << LogStash::Event.new("@message" => "just another log rollin' #{i}", # a fail to configure exception.
"@source" => "logstash tests in #{myfile}") EventMachine::add_timer(3) do
end
# TODO(sissel): Need a way to hook when the agent is ready? events = []
EventMachine.next_tick do myfile = File.basename(__FILE__)
events.each do |e| 1.upto(5).each do |i|
@input.push e events << LogStash::Event.new("@message" => "just another log rollin' #{i}",
"@source" => "logstash tests in #{myfile}")
end end
end # next_tick, push our events
tries = 30 # TODO(sissel): Need a way to hook when the agent is ready?
EventMachine.add_periodic_timer(0.2) do EventMachine.next_tick do
es = LogStash::Web::ElasticSearch.new(:port => @port) events.each do |e|
es.search(:q => "*", :count => 5, :offset => 0) do |results| @input.push e
hits = (results["hits"]["hits"] rescue []) end
if events.size == hits.size end # next_tick, push our events
puts "Found #{hits.size} events, ready to verify!"
expected = events.clone tries = 30
assert_equal(events.size, hits.size) EventMachine.add_periodic_timer(0.2) do
events.each { |e| p :expect => e } es = LogStash::Search::ElasticSearch.new(:port => @port, :host => "localhost")
hits.each do |hit| query = LogStash::Search::Query.new(:query_string => "*", :count => 5)
event = LogStash::Event.new(hit["_source"]) es.search(query) do |result|
p :got => event if events.size == result.events.size
assert(expected.include?(event), "Found event in results that was not expected: #{event.inspect}\n\nExpected: #{events.map{ |a| a.inspect }.join("\n")}") puts "Found #{result.events.size} events, ready to verify!"
end expected = events.clone
EventMachine.stop_event_loop assert_equal(events.size, result.events.size)
next # break out events.each { |e| p :expect => e }
else result.events.each do |event|
tries -= 1 p :got => event
if tries <= 0 assert(expected.include?(event), "Found event in results that was not expected: #{event.inspect}\n\nExpected: #{events.map{ |a| a.inspect }.join("\n")}")
assert(false, "Gave up trying to query elasticsearch. Maybe we aren't indexing properly?") end
EventMachine.stop_event_loop EventMachine.stop_event_loop
end next # break out
end # if events.size == hits.size else
end # es.search tries -= 1
end # add_periodic_timer(0.2) / query elasticsearch if tries <= 0
assert(false, "Gave up trying to query elasticsearch. Maybe we aren't indexing properly?")
EventMachine.stop_event_loop
end
end # if events.size == hits.size
end # es.search
end # add_periodic_timer(0.2) / query elasticsearch
end # sleep for 3 seconds before going to allow the registration to work.
end # EventMachine::run end # EventMachine::run
end # def test_elasticsearch_basic end # def test_elasticsearch_basic
end # class TestOutputElasticSearch end # class TestOutputElasticSearch
class TestOutputElasticSearch0_13_1 < TestOutputElasticSearch #class TestOutputElasticSearch0_13_1 < TestOutputElasticSearch
ELASTICSEARCH_VERSION = self.name[/[0-9_]+/].gsub("_", ".") #ELASTICSEARCH_VERSION = self.name[/[0-9_]+/].gsub("_", ".")
end # class TestOutputElasticSearch0_13_1 #end # class TestOutputElasticSearch0_13_1
#
class TestOutputElasticSearch0_12_0 < TestOutputElasticSearch #class TestOutputElasticSearch0_12_0 < TestOutputElasticSearch
ELASTICSEARCH_VERSION = self.name[/[0-9_]+/].gsub("_", ".") #ELASTICSEARCH_VERSION = self.name[/[0-9_]+/].gsub("_", ".")
end # class TestOutputElasticSearch0_12_0 #end # class TestOutputElasticSearch0_12_0