From 10ac156f3e6aa6bdbbab68150a87d6a14c97f33d Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Wed, 18 Jun 2014 18:10:14 +0000 Subject: [PATCH] add robustness to tweets stream handling Fixes #1450 --- lib/logstash/inputs/twitter.rb | 58 +++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/lib/logstash/inputs/twitter.rb b/lib/logstash/inputs/twitter.rb index 75ba6884d..b746c9d1b 100644 --- a/lib/logstash/inputs/twitter.rb +++ b/lib/logstash/inputs/twitter.rb @@ -65,27 +65,41 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base public def run(queue) @logger.info("Starting twitter tracking", :keywords => @keywords) - @client.filter(:track => @keywords.join(",")) do |tweet| - @logger.info? && @logger.info("Got tweet", :user => tweet.user.screen_name, :text => tweet.text) - if @full_tweet - event = LogStash::Event.new(LogStash::Util.stringify_symbols(tweet.to_hash)) - event.timestamp = LogStash::Timestamp.new(tweet.created_at) - else - event = LogStash::Event.new( - LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at), - "message" => tweet.full_text, - "user" => tweet.user.screen_name, - "client" => tweet.source, - "retweeted" => tweet.retweeted?, - "source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}" - ) - end - decorate(event) - event["in-reply-to"] = tweet.in_reply_to_status_id if tweet.reply? - unless tweet.urls.empty? - event["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s) - end - queue << event - end # client.filter + begin + @client.filter(:track => @keywords.join(",")) do |tweet| + if tweet.is_a?(Twitter::Tweet) + @logger.debug? && @logger.debug("Got tweet", :user => tweet.user.screen_name, :text => tweet.text) + if @full_tweet + event = LogStash::Event.new(LogStash::Util.stringify_symbols(tweet.to_hash)) + event.timestamp = LogStash::Timestamp.new(tweet.created_at) + else + event = LogStash::Event.new( + LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at), + "message" => tweet.full_text, + "user" => tweet.user.screen_name, + "client" => tweet.source, + "retweeted" => tweet.retweeted?, + "source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}" + ) + event["in-reply-to"] = tweet.in_reply_to_status_id if tweet.reply? + unless tweet.urls.empty? + event["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s) + end + end + + decorate(event) + queue << event + end + end # client.filter + rescue LogStash::ShutdownSignal + return + rescue Twitter::Error::TooManyRequests => e + @logger.warn("Twitter too many requests error, sleeping for #{e.rate_limit.reset_in}s") + sleep(e.rate_limit.reset_in) + retry + rescue => e + @logger.warn("Twitter client error", :message => e.message, :exception => e, :backtrace => e.backtrace) + retry + end end # def run end # class LogStash::Inputs::Twitter