add robustness to tweets stream handling

Fixes #1450
This commit is contained in:
Colin Surprenant 2014-06-18 18:10:14 +00:00 committed by Jordan Sissel
parent bf11118709
commit 10ac156f3e

View file

@ -65,27 +65,41 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
public public
def run(queue) def run(queue)
@logger.info("Starting twitter tracking", :keywords => @keywords) @logger.info("Starting twitter tracking", :keywords => @keywords)
@client.filter(:track => @keywords.join(",")) do |tweet| begin
@logger.info? && @logger.info("Got tweet", :user => tweet.user.screen_name, :text => tweet.text) @client.filter(:track => @keywords.join(",")) do |tweet|
if @full_tweet if tweet.is_a?(Twitter::Tweet)
event = LogStash::Event.new(LogStash::Util.stringify_symbols(tweet.to_hash)) @logger.debug? && @logger.debug("Got tweet", :user => tweet.user.screen_name, :text => tweet.text)
event.timestamp = LogStash::Timestamp.new(tweet.created_at) if @full_tweet
else event = LogStash::Event.new(LogStash::Util.stringify_symbols(tweet.to_hash))
event = LogStash::Event.new( event.timestamp = LogStash::Timestamp.new(tweet.created_at)
LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at), else
"message" => tweet.full_text, event = LogStash::Event.new(
"user" => tweet.user.screen_name, LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at),
"client" => tweet.source, "message" => tweet.full_text,
"retweeted" => tweet.retweeted?, "user" => tweet.user.screen_name,
"source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}" "client" => tweet.source,
) "retweeted" => tweet.retweeted?,
end "source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}"
decorate(event) )
event["in-reply-to"] = tweet.in_reply_to_status_id if tweet.reply? event["in-reply-to"] = tweet.in_reply_to_status_id if tweet.reply?
unless tweet.urls.empty? unless tweet.urls.empty?
event["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s) event["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s)
end end
queue << event end
end # client.filter
decorate(event)
queue << event
end
end # client.filter
rescue LogStash::ShutdownSignal
return
rescue Twitter::Error::TooManyRequests => e
@logger.warn("Twitter too many requests error, sleeping for #{e.rate_limit.reset_in}s")
sleep(e.rate_limit.reset_in)
retry
rescue => e
@logger.warn("Twitter client error", :message => e.message, :exception => e, :backtrace => e.backtrace)
retry
end
end # def run end # def run
end # class LogStash::Inputs::Twitter end # class LogStash::Inputs::Twitter