diff --git a/lib/logstash/inputs/twitter.rb b/lib/logstash/inputs/twitter.rb index 2062bfeec..a060d04bf 100644 --- a/lib/logstash/inputs/twitter.rb +++ b/lib/logstash/inputs/twitter.rb @@ -18,7 +18,13 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base req.stream do |chunk| buffer.extract(chunk).each do |line| - tweet = JSON.parse(line) + begin + tweet = JSON.parse(line) + rescue JSON::ParserError => e + @logger.warn("Invalid JSON, aborting connection: #{line}") + req.errback + next + end next if !tweet event = LogStash::Event.new({ @@ -49,7 +55,9 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base req.errback do @logger.warn(["Error occurred, not sure what, seriously. Reconnecting!", { :url => @url }]) - EventMachine::Timer.new(15) do + req.close_connection() rescue nil + + EventMachine::Timer.new(60) do connect.call end end # req.errback diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index c095a393a..116e5a09f 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -15,6 +15,45 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base defaults = {"method" => "http"} params = defaults.merge(@urlopts) + # Describe this index to elasticsearch + indexmap = { + # The name of the index + "settings": { + @httpurl.path.split("/")[-1] => { + "properties" => { + "@source" => { "type" => "string" }, + "@source_host" => { "type" => "string" }, + "@source_path" => { "type" => "string" }, + "@timestamp" => { "type" => "date" }, + "@tags" => { "type" => "string" }, + "@message" => { "type" => "string" }, + + # TODO(sissel): Hack for now until this bug is resolved: + # https://github.com/elasticsearch/elasticsearch/issues/issue/604 + "@fields" => { + "type": "object" + "properties" => { + "HOSTNAME" => { "type" => "string" }, + }, + }, # "@fields" + }, # "properties" + }, # index map for this index type. + }, # "settings" + } # ES Index + + indexurl = @httpurl.to_s + "/_mapping" + indexmap_http = EventMachine::HttpRequest.new(indexurl) + indexmap_req = indexmap_http.put :body => indexmap.to_json + indexmap_req.callback do + @logger.info(["Done configuring index", indexurl, indexmap]) + ready(params) + end + indexmap_req.errback do + @logger.warn(["Failure configuring index", @httpurl.to_s, indexmap]) + end + end # def register + + def ready(params) case params["method"] when "http" @logger.debug "ElasticSearch using http with URL #{@httpurl.to_s}" @@ -50,7 +89,7 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base end else raise "unknown elasticsearch method #{params["method"].inspect}" end - end # def register + end # def ready def receive(event) @callback.call(event) diff --git a/lib/logstash/web/views/search/ajax.haml b/lib/logstash/web/views/search/ajax.haml index 9f8f58767..9a9d07aa2 100644 --- a/lib/logstash/web/views/search/ajax.haml +++ b/lib/logstash/web/views/search/ajax.haml @@ -31,7 +31,7 @@ last - if @hits.length == 0 - if !params[:q] - %h3#querystatus No query given. How about this? + %h3#querystatus No query given. How about this? - else %h3#querystatus No results for query '#{params[:q]}' - else