diff --git a/bin/logstash-test b/bin/logstash-test
index af76b75d3..40642aa94 100755
--- a/bin/logstash-test
+++ b/bin/logstash-test
@@ -36,6 +36,8 @@ def check_libraries
"needed for websocket output")
results << check_lib("rack", "rack", true,
"needed for logstash-web")
+ results << check_lib("thin", "thin", true,
+ "needed for logstash-web")
results << check_lib("amqp", "amqp", true,
"needed for AMQP input and output")
results << check_lib("sinatra/async", "async_sinatra", true,
@@ -46,6 +48,8 @@ def check_libraries
"improve logstash debug logging output")
results << check_lib("eventmachine", "eventmachine", false,
"required for logstash to function")
+ results << check_lib("json", "json", false,
+ "required for logstash to function")
missing_required = results.count { |r| !r[:optional] and !r[:found] }
if missing_required == 0
@@ -66,6 +70,8 @@ end
def main(args)
report_ruby_version
+ # TODO(sissel): Add a way to call out specific things to test, like
+ # logstash-web, elasticsearch, mongodb, syslog, etc.
if !check_libraries
puts "Library check failed."
return 1
diff --git a/lib/logstash/namespace.rb b/lib/logstash/namespace.rb
index a88cf5fb6..91f50ebe1 100644
--- a/lib/logstash/namespace.rb
+++ b/lib/logstash/namespace.rb
@@ -2,4 +2,5 @@ module LogStash
module Inputs; end
module Outputs; end
module Filters; end
+ module Search; end
end # module LogStash
diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb
index 536ec77d1..4eca6f3c3 100644
--- a/lib/logstash/outputs/elasticsearch.rb
+++ b/lib/logstash/outputs/elasticsearch.rb
@@ -41,6 +41,9 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
}, # "settings"
} # ES Index
+ #puts :waiting
+ puts @esurl.to_s
+ #sleep 10
indexurl = @esurl.to_s
indexmap_http = EventMachine::HttpRequest.new(indexurl)
indexmap_req = indexmap_http.put :body => indexmap.to_json
@@ -49,8 +52,12 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
ready(params)
end
indexmap_req.errback do
- @logger.warn(["Failure configuring index", @esurl.to_s, indexmap])
+ @logger.warn(["Failure configuring index (http failed to connect?)",
+ @esurl.to_s, indexmap])
+ @logger.warn([indexmap_req])
+ #sleep 30
raise "Failure configuring index: #{@esurl.to_s}"
+
end
end # def register
diff --git a/lib/logstash/search/base.rb b/lib/logstash/search/base.rb
new file mode 100644
index 000000000..8abb01cc9
--- /dev/null
+++ b/lib/logstash/search/base.rb
@@ -0,0 +1,39 @@
+
+require "logstash/namespace"
+require "logstash/logging"
+require "logstash/event"
+
+class LogStash::Search::Base
+ # Do a search.
+ #
+ # This method is async. You can expect a block and therefore
+ # should yield a result, not return one.
+ #
+ # Implementations should yield a LogStash::Search::Result
+ # LogStash::Search::Result#events must be an array of LogStash::Event
+ def search(query)
+ raise "The class #{self.class.name} must implement the 'search' method."
+ end # def search
+
+ # Yields a histogram by field of a query.
+ #
+ # This method is async. You should expect a block to be passed and therefore
+ # should yield a result, not return one.
+ #
+ # Implementations should yield a LogStash::Search::FacetResult::Histogram
+ def histogram(query, field, interval=nil)
+ raise "The class #{self.class.name} must implement the 'histogram' method."
+ end
+
+ # Returns a list of popular terms from a query
+ # TODO(sissel): Implement
+ def popular_terms(query, fields, count=10)
+ raise "The class #{self.class.name} must implement the 'popular_terms' method."
+ end
+
+ # Count the results given by a query.
+ def count(query)
+ raise "The class #{self.class.name} must implement the 'count' method."
+ end
+
+end # class LogStash::Search::Base
diff --git a/lib/logstash/search/elasticsearch.rb b/lib/logstash/search/elasticsearch.rb
new file mode 100644
index 000000000..2603b53f3
--- /dev/null
+++ b/lib/logstash/search/elasticsearch.rb
@@ -0,0 +1,196 @@
+
+require "em-http-request"
+require "logstash/namespace"
+require "logstash/logging"
+require "logstash/event"
+require "logstash/search/base"
+require "logstash/search/query"
+require "logstash/search/result"
+require "logstash/search/facetresult"
+require "logstash/search/facetresult/histogram"
+
+class LogStash::Search::ElasticSearch < LogStash::Search::Base
+ public
+ def initialize(settings={})
+ @host = (settings[:host] || "localhost")
+ @port = (settings[:port] || 9200).to_i
+ @logger = LogStash::Logger.new(STDOUT)
+ end
+
+ # See LogStash::Search;:Base#search
+ public
+ def search(query)
+ raise "No block given for search call." if !block_given?
+ if query.is_a?(String)
+ query = LogStash::Search::Query.parse(query)
+ end
+
+ # TODO(sissel): only search a specific index?
+ http = EventMachine::HttpRequest.new("http://#{@host}:#{@port}/_search")
+
+ @logger.info(["Query", query])
+ esreq = {
+ "sort" => [
+ { "@timestamp" => "desc" }
+ ],
+ "query" => {
+ "query_string" => {
+ "query" => query.query_string,
+ "default_operator" => "AND"
+ } # query_string
+ }, # query
+ "from" => query.offset,
+ "size" => query.count
+ } # elasticsearch request
+
+ @logger.info("ElasticSearch Query: #{esreq.to_json}")
+ start_time = Time.now
+ req = http.get :body => esreq.to_json
+ result = LogStash::Search::Result.new
+ req.callback do
+ data = JSON.parse(req.response)
+ result.duration = Time.now - start_time
+
+ hits = data["hits"]["hits"] rescue nil
+
+ if hits.nil? or !data["error"].nil?
+ # Use the error message if any, otherwise, return the whole
+ # data object as json as the error message for debugging later.
+ result.error_message = (data["error"] rescue false) || data.to_json
+ yield result
+ next
+ end
+
+ @logger.info(["Got search results",
+ { :query => query.query_string, :duration => data["duration"],
+ :result_count => hits.size }])
+ if req.response_header.status != 200
+ result.error_message = data["error"] || req.inspect
+ @error = data["error"] || req.inspect
+ end
+
+ # We want to yield a list of LogStash::Event objects.
+ hits.each do |hit|
+ result.events << LogStash::Event.new(hit["_source"])
+ end
+
+ # Total hits this search could find if not limited
+ result.total = data["hits"]["total"]
+ result.offset = query.offset
+
+ yield result
+ end
+
+ req.errback do
+ @logger.warn(["Query failed", query, req, req.response])
+ result.duration = Time.now - start_time
+ result.error_message = req.response
+ #yield result
+
+ yield({ "error" => req.response })
+ end
+ end # def search
+
+ # See LogStash::Search;:Base#histogram
+ public
+ def histogram(query, field, interval=nil)
+ if query.is_a?(String)
+ query = LogStash::Search::Query.parse(query)
+ end
+
+ # TODO(sissel): only search a specific index?
+ http = EventMachine::HttpRequest.new("http://#{@host}:#{@port}/_search")
+
+ @logger.info(["Query", query])
+ histogram_settings = {
+ "field" => field
+ }
+
+ if !interval.nil? && interval.is_a?(Numeric)
+ histogram_settings["interval"] = interval
+ end
+
+ esreq = {
+ "query" => {
+ "query_string" => {
+ "query" => query.query_string,
+ "default_operator" => "AND"
+ } # query_string
+ }, # query
+ "from" => 0,
+ "size" => 0,
+ "facets" => {
+ "amazingpants" => { # just a name for this histogram...
+ "histogram" => histogram_settings,
+ },
+ },
+ } # elasticsearch request
+
+ @logger.info("ElasticSearch Facet Query: #{esreq.to_json}")
+ start_time = Time.now
+ req = http.get :body => esreq.to_json
+ result = LogStash::Search::FacetResult.new
+ req.callback do
+ data = JSON.parse(req.response)
+ result.duration = Time.now - start_time
+
+ @logger.info(["Got search results",
+ { :query => query.query_string, :duration => data["duration"] }])
+ if req.response_header.status != 200
+ result.error_message = data["error"] || req.inspect
+ @error = data["error"] || req.inspect
+ end
+
+ entries = data["facets"]["amazingpants"]["entries"] rescue nil
+
+ if entries.nil? or !data["error"].nil?
+ # Use the error message if any, otherwise, return the whole
+ # data object as json as the error message for debugging later.
+ result.error_message = (data["error"] rescue false) || data.to_json
+ yield result
+ next
+ end
+ entries.each do |entry|
+ # entry is a hash of keys 'total', 'mean', 'count', and 'key'
+ hist_entry = LogStash::Search::FacetResult::Histogram.new
+ hist_entry.key = entry["key"]
+ hist_entry.count = entry["count"]
+ result.results << hist_entry
+ end # for each histogram result
+ yield result
+ end # request callback
+
+ req.errback do
+ @logger.warn(["Query failed", query, req, req.response])
+ result.duration = Time.now - start_time
+ result.error_message = req.response
+ yield result
+ #yield({ "error" => req.response })
+ end
+ end
+
+ # Not used. Needs refactoring elsewhere.
+ private
+ def __anonymize
+ # TODO(sissel): Plugin-ify this (Search filters!)
+ # TODO(sissel): Implement
+ # Search anonymization
+ #require "digest/md5"
+ #data["hits"]["hits"].each do |hit|
+ [].each do |hit|
+ event = LogStash::Event.new(hit["_source"])
+ event.to_hash.each do |key, value|
+ next unless value.is_a?(String)
+ value.gsub!(/[^ ]+\.loggly\.net/) { |match| "loggly-" + Digest::MD5.hexdigest(match)[0..6] + ".example.com"}
+ end
+
+ event.fields.each do |key, value|
+ value = [value] if value.is_a?(String)
+ next unless value.is_a?(Array)
+ value.each do |v|
+ v.gsub!(/[^ ]+\.loggly\.net/) { |match| "loggly-" + Digest::MD5.hexdigest(match)[0..6] + ".example.com"}
+ end # value.each
+ end # hit._source.@fields.each
+ end # data.hits.hits.each
+ end # def __anonymize
+end # class LogStash::Search::ElasticSearch
diff --git a/lib/logstash/search/facetresult.rb b/lib/logstash/search/facetresult.rb
new file mode 100644
index 000000000..c42d76ee9
--- /dev/null
+++ b/lib/logstash/search/facetresult.rb
@@ -0,0 +1,25 @@
+
+require "logstash/namespace"
+require "logstash/logging"
+
+class LogStash::Search::FacetResult
+ # Array of LogStash::Search::FacetResult::Entry
+ attr_accessor :results
+
+ # How long this query took, in seconds (or fractions of).
+ attr_accessor :duration
+
+ # Error message, if any.
+ attr_accessor :error_message
+
+ def initialize(settings={})
+ @results = []
+ @duration = nil
+ @error_message = nil
+ end
+
+ def error?
+ return !@error_message.nil?
+ end
+end # class LogStash::Search::FacetResult
+
diff --git a/lib/logstash/search/facetresult/entry.rb b/lib/logstash/search/facetresult/entry.rb
new file mode 100644
index 000000000..f09decca1
--- /dev/null
+++ b/lib/logstash/search/facetresult/entry.rb
@@ -0,0 +1,6 @@
+
+require "logstash/search/facetresult"
+
+class LogStash::Search::FacetResult::Entry
+ # nothing here
+end # class LogStash::Search::FacetResult::Entry
diff --git a/lib/logstash/search/facetresult/histogram.rb b/lib/logstash/search/facetresult/histogram.rb
new file mode 100644
index 000000000..1851334d5
--- /dev/null
+++ b/lib/logstash/search/facetresult/histogram.rb
@@ -0,0 +1,21 @@
+
+require "json"
+require "logstash/search/facetresult/entry"
+
+class LogStash::Search::FacetResult::Histogram < LogStash::Search::FacetResult::Entry
+ # The name or key for this result.
+ attr_accessor :key
+ attr_accessor :mean
+ attr_accessor :total
+ attr_accessor :count
+
+ # sometimes a parent call to to_json calls us with args?
+ def to_json(*args)
+ return {
+ "key" => @key,
+ "mean" => @mean,
+ "total" => @total,
+ "count" => @count,
+ }.to_json
+ end
+end
diff --git a/lib/logstash/search/query.rb b/lib/logstash/search/query.rb
new file mode 100644
index 000000000..5013373d5
--- /dev/null
+++ b/lib/logstash/search/query.rb
@@ -0,0 +1,35 @@
+require "logstash/namespace"
+require "logstash/logging"
+
+class LogStash::Search::Query
+ # The query string
+ attr_accessor :query_string
+
+ # The offset to start at (like SQL's SELECT ... OFFSET n)
+ attr_accessor :offset
+
+ # The max number of results to return. (like SQL's SELECT ... LIMIT n)
+ attr_accessor :count
+
+ # New query object.
+ #
+ # 'settings' should be a hash containing:
+ #
+ # * :query_string - a string query for searching
+ # * :offset - (optional, default 0) offset to search from
+ # * :count - (optional, default 50) max number of results to return
+ def initialize(settings)
+ @query_string = settings[:query_string]
+ @offset = settings[:offset] || 0
+ @count = settings[:count] || 50
+ end
+
+ # Class method. Parses a query string and returns
+ # a LogStash::Search::Query instance
+ def self.parse(query_string)
+ # TODO(sissel): I would prefer not to invent my own query language.
+ # Can we be similar to Lucene, SQL, or other query languages?
+ return self.new(:query_string => query_string)
+ end
+
+end # class LogStash::Search::Query
diff --git a/lib/logstash/search/result.rb b/lib/logstash/search/result.rb
new file mode 100644
index 000000000..a273a955e
--- /dev/null
+++ b/lib/logstash/search/result.rb
@@ -0,0 +1,39 @@
+require "logstash/namespace"
+require "logstash/logging"
+
+class LogStash::Search::Result
+ # Array of LogStash::Event of results
+ attr_accessor :events
+
+ # How long this query took, in seconds (or fractions of).
+ attr_accessor :duration
+
+ # Offset in search
+ attr_accessor :offset
+
+ # Total records matched by this query, regardless of offset/count in query.
+ attr_accessor :total
+
+ # Error message, if any.
+ attr_accessor :error_message
+
+ def initialize(settings={})
+ @events = []
+ @duration = nil
+ @error_message = nil
+ end
+
+ def error?
+ return !@error_message.nil?
+ end
+
+ def to_json
+ return {
+ "events" => @events,
+ "duration" => @duration,
+ "offset" => @offset,
+ "total" => @total,
+ }.to_json
+ end # def to_json
+end # class LogStash::Search::Result
+
diff --git a/lib/logstash/search/twitter.rb b/lib/logstash/search/twitter.rb
new file mode 100644
index 000000000..643fa19b5
--- /dev/null
+++ b/lib/logstash/search/twitter.rb
@@ -0,0 +1,90 @@
+require "em-http-request"
+require "logstash/namespace"
+require "logstash/logging"
+require "logstash/event"
+require "logstash/search/base"
+require "logstash/search/query"
+require "logstash/search/result"
+require "logstash/search/facetresult"
+require "logstash/search/facetresult/histogram"
+
+class LogStash::Search::Twitter < LogStash::Search::Base
+ public
+ def initialize(settings={})
+ @host = (settings[:host] || "search.twitter.com")
+ @port = (settings[:port] || 80).to_i
+ @logger = LogStash::Logger.new(STDOUT)
+ end
+
+ public
+ def search(query)
+ raise "No block given for search call." if !block_given?
+ if query.is_a?(String)
+ query = LogStash::Search::Query.parse(query)
+ end
+
+ # TODO(sissel): only search a specific index?
+ http = EventMachine::HttpRequest.new("http://#{@host}:#{@port}/search.json?q=#{URI.escape(query.query_string)}&rpp=#{URI.escape(query.count) rescue query.count}")
+
+ @logger.info(["Query", query])
+
+ start_time = Time.now
+ req = http.get
+
+ result = LogStash::Search::Result.new
+ req.callback do
+ data = JSON.parse(req.response)
+ result.duration = Time.now - start_time
+
+ hits = (data["results"] || nil) rescue nil
+
+ if hits.nil? or !data["error"].nil?
+ # Use the error message if any, otherwise, return the whole
+ # data object as json as the error message for debugging later.
+ result.error_message = (data["error"] rescue false) || data.to_json
+ yield result
+ next
+ end
+
+ hits.each do |hit|
+ hit["@message"] = hit["text"]
+ hit["@timestamp"] = hit["created_at"]
+ hit.delete("text")
+ end
+
+ @logger.info(["Got search results",
+ { :query => query.query_string, :duration => data["duration"],
+ :result_count => hits.size }])
+
+ if req.response_header.status != 200
+ result.error_message = data["error"] || req.inspect
+ @error = data["error"] || req.inspect
+ end
+
+ # We want to yield a list of LogStash::Event objects.
+ hits.each do |hit|
+ result.events << LogStash::Event.new(hit)
+ end
+
+ # Total hits this search could find if not limited
+ result.total = hits.size
+ result.offset = 0
+
+ yield result
+ end
+
+ req.errback do
+ @logger.warn(["Query failed", query, req, req.response])
+ result.duration = Time.now - start_time
+ result.error_message = req.response
+
+ yield result
+ end
+ end # def search
+
+ def histogram(query, field, interval=nil)
+ # Nothing to histogram.
+ result = LogStash::Search::FacetResult.new
+ yield result
+ end
+end # class LogStash::Search::ElasticSearch
diff --git a/lib/logstash/web/helpers/require_param.rb b/lib/logstash/web/helpers/require_param.rb
new file mode 100644
index 000000000..bfbd44c99
--- /dev/null
+++ b/lib/logstash/web/helpers/require_param.rb
@@ -0,0 +1,17 @@
+require "sinatra/base"
+
+module Sinatra
+ module RequireParam
+ def require_param(*fields)
+ missing = []
+ fields.each do |field|
+ if params[field].nil?
+ missing << field
+ end
+ end
+ return missing
+ end # def require_param
+ end # module RequireParam
+
+ helpers RequireParam
+end # module Sinatra
diff --git a/lib/logstash/web/lib/elasticsearch.rb b/lib/logstash/web/lib/elasticsearch.rb
deleted file mode 100644
index 62efd7873..000000000
--- a/lib/logstash/web/lib/elasticsearch.rb
+++ /dev/null
@@ -1,86 +0,0 @@
-
-require "em-http-request"
-require "logstash/namespace"
-require "logstash/logging"
-require "logstash/event"
-
-module LogStash::Web; end
-
-class LogStash::Web::ElasticSearch
- public
- def initialize(settings)
- @port = (settings[:port] || 9200).to_i
- @logger = LogStash::Logger.new(STDOUT)
- end
-
- public
- def search(params)
- http = EventMachine::HttpRequest.new("http://localhost:#{@port}/_search")
- params[:offset] ||= 0
- params[:count] ||= 20
-
- @logger.info(["Query", params])
- esreq = {
- "sort" => [
- { "@timestamp" => "desc" }
- ],
- "query" => {
- "query_string" => {
- "query" => params[:q],
- "default_operator" => "AND"
- } # query_string
- }, # query
- "facets" => {
- "by_hour" => {
- "histogram" => {
- "field" => "@timestamp",
- "time_interval" => "1h",
- }, # histogram
- }, # by_hour
- }, # facets
- "from" => params[:offset],
- "size" => params[:count],
- }
-
- @logger.info("ElasticSearch Query: #{esreq.to_json}")
- start_time = Time.now
- req = http.get :body => esreq.to_json
- req.callback do
- #headers req.response_header
- data = JSON.parse(req.response)
- data["duration"] = Time.now - start_time
-
- # TODO(sissel): Plugin-ify this (Search filters!)
- # Search anonymization
- #require "digest/md5"
- #data["hits"]["hits"].each do |hit|
- [].each do |hit|
- event = LogStash::Event.new(hit["_source"])
- event.to_hash.each do |key, value|
- next unless value.is_a?(String)
- value.gsub!(/[^ ]+\.loggly\.net/) { |match| "loggly-" + Digest::MD5.hexdigest(match)[0..6] + ".example.com"}
- end
-
- event.fields.each do |key, value|
- value = [value] if value.is_a?(String)
- next unless value.is_a?(Array)
- value.each do |v|
- v.gsub!(/[^ ]+\.loggly\.net/) { |match| "loggly-" + Digest::MD5.hexdigest(match)[0..6] + ".example.com"}
- end # value.each
- end # hit._source.@fields.each
- end # data.hits.hits.each
-
- @logger.info(["Got search results",
- { :query => params[:q], :duration => data["duration"]}])
- #@logger.info(data)
- if req.response_header.status != 200
- @error = data["error"] || req.inspect
- end
- yield data
- end
- req.errback do
- @logger.warn(["Query failed", params, req, req.response])
- yield({ "error" => req.response })
- end
- end # def search
-end # class LogStash::Web::ElasticSearch
diff --git a/lib/logstash/web/public/js/logstash.js b/lib/logstash/web/public/js/logstash.js
index b30292af1..754e3432a 100644
--- a/lib/logstash/web/public/js/logstash.js
+++ b/lib/logstash/web/public/js/logstash.js
@@ -1,4 +1,6 @@
(function() {
+ // TODO(sissel): Write something that will use history.pushState and fall back
+ // to document.location.hash madness.
var logstash = {
params: {
@@ -6,21 +8,78 @@
count: 50,
},
- search: function(query) {
+ search: function(query, options) {
if (query == undefined || query == "") {
return;
}
- //console.log("Searching: " + query);
+
+ /* Default options */
+ if (typeof(options) == 'undefined') {
+ options = { graph: true };
+ }
var display_query = query.replace("<", "<").replace(">", ">")
- $("#querystatus").html("Loading query '" + display_query + "'")
+ $("#querystatus, #results h1").html("Loading query '" + display_query + "' (offset:" + logstash.params.offset + ", count:" + logstash.params.count + ") ")
//console.log(logstash.params)
logstash.params.q = query;
document.location.hash = escape(JSON.stringify(logstash.params));
- $("#results").load("/search/ajax", logstash.params);
+
+ /* Load the search results */
+ $("#results").load("/api/search?format=html", logstash.params);
+
+ if (options.graph != false) {
+ /* Load the default histogram graph */
+ logstash.params.interval = 3600000; /* 1 hour, default */
+ logstash.histogram();
+ } /* if options.graph != false */
$("#query").val(logstash.params.q);
}, /* search */
+ histogram: function(tries) {
+ if (typeof(tries) == 'undefined') {
+ tries = 7;
+ }
+
+ /* GeoCities mode on the graph while waiting ...
+ * This won't likely survive 1.0, but it's fun for now... */
+ $("#visual").html("