- Fix possible twitter stream api errors

- explicitly create indeces in ES
- fix default query
This commit is contained in:
Jordan Sissel 2011-01-07 16:28:31 -08:00
parent 46305a05c5
commit b88c1b5e3b
3 changed files with 51 additions and 4 deletions

View file

@ -18,7 +18,13 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
req.stream do |chunk|
buffer.extract(chunk).each do |line|
begin
tweet = JSON.parse(line)
rescue JSON::ParserError => e
@logger.warn("Invalid JSON, aborting connection: #{line}")
req.errback
next
end
next if !tweet
event = LogStash::Event.new({
@ -49,7 +55,9 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
req.errback do
@logger.warn(["Error occurred, not sure what, seriously. Reconnecting!", { :url => @url }])
EventMachine::Timer.new(15) do
req.close_connection() rescue nil
EventMachine::Timer.new(60) do
connect.call
end
end # req.errback

View file

@ -15,6 +15,45 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
defaults = {"method" => "http"}
params = defaults.merge(@urlopts)
# Describe this index to elasticsearch
indexmap = {
# The name of the index
"settings": {
@httpurl.path.split("/")[-1] => {
"properties" => {
"@source" => { "type" => "string" },
"@source_host" => { "type" => "string" },
"@source_path" => { "type" => "string" },
"@timestamp" => { "type" => "date" },
"@tags" => { "type" => "string" },
"@message" => { "type" => "string" },
# TODO(sissel): Hack for now until this bug is resolved:
# https://github.com/elasticsearch/elasticsearch/issues/issue/604
"@fields" => {
"type": "object"
"properties" => {
"HOSTNAME" => { "type" => "string" },
},
}, # "@fields"
}, # "properties"
}, # index map for this index type.
}, # "settings"
} # ES Index
indexurl = @httpurl.to_s + "/_mapping"
indexmap_http = EventMachine::HttpRequest.new(indexurl)
indexmap_req = indexmap_http.put :body => indexmap.to_json
indexmap_req.callback do
@logger.info(["Done configuring index", indexurl, indexmap])
ready(params)
end
indexmap_req.errback do
@logger.warn(["Failure configuring index", @httpurl.to_s, indexmap])
end
end # def register
def ready(params)
case params["method"]
when "http"
@logger.debug "ElasticSearch using http with URL #{@httpurl.to_s}"
@ -50,7 +89,7 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
end
else raise "unknown elasticsearch method #{params["method"].inspect}"
end
end # def register
end # def ready
def receive(event)
@callback.call(event)

View file

@ -31,7 +31,7 @@
last
- if @hits.length == 0
- if !params[:q]
%h3#querystatus No query given. How about <a href="?q=* @timestamp:[#{(Time.now + 24*60*60).strftime("%Y-%m-%d")} TO #{(Time.now - 7*24*60*60).strftime("%Y-%m-%d")}]" class="querychanger">this?</a>
%h3#querystatus No query given. How about <a href="?q=* @timestamp:[#{(Time.now - 7*24*60*60).strftime("%Y-%m-%d")} TO #{(Time.now + 24*60*60).strftime("%Y-%m-%d")}]" class="querychanger">this?</a>
- else
%h3#querystatus No results for query '#{params[:q]}'
- else