mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge pull request #522 from AaronTheApe/master
SQS Output Now Supports Batching
This commit is contained in:
commit
ca6a498988
2 changed files with 48 additions and 2 deletions
|
@ -69,6 +69,9 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
|||
# Name of the event field in which to store the SQS message MD5 checksum
|
||||
config :md5_field, :validate => :string
|
||||
|
||||
# Name of the event field in which to store the SQS message Sent Timestamp
|
||||
config :sent_timestamp_field, :validate => :string
|
||||
|
||||
public
|
||||
def aws_service_endpoint(region)
|
||||
return {
|
||||
|
@ -104,7 +107,8 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
|||
|
||||
receive_opts = {
|
||||
:limit => 10,
|
||||
:visibility_timeout => 30
|
||||
:visibility_timeout => 30,
|
||||
:attributes => [:sent_at]
|
||||
}
|
||||
|
||||
continue_polling = true
|
||||
|
@ -120,7 +124,10 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
|||
if @md5_field
|
||||
e[@md5_field] = message.md5
|
||||
end
|
||||
@logger.debug("Processed SQS message", :message_id => message.id, :message_md5 => message.md5, :queue => @queue)
|
||||
if @sent_timestamp_field
|
||||
e[@sent_timestamp_field] = message.sent_timestamp.utc
|
||||
end
|
||||
@logger.debug("Processed SQS message", :message_id => message.id, :message_md5 => message.md5, :sent_timestamp => message.sent_timestamp, :queue => @queue)
|
||||
output_queue << e
|
||||
message.delete
|
||||
end # valid event
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
require "logstash/outputs/base"
|
||||
require "logstash/namespace"
|
||||
require "logstash/plugin_mixins/aws_config"
|
||||
require "stud/buffer"
|
||||
|
||||
# Push events to an Amazon Web Services Simple Queue Service (SQS) queue.
|
||||
#
|
||||
|
@ -57,6 +58,7 @@ require "logstash/plugin_mixins/aws_config"
|
|||
#
|
||||
class LogStash::Outputs::SQS < LogStash::Outputs::Base
|
||||
include LogStash::PluginMixins::AwsConfig
|
||||
include Stud::Buffer
|
||||
|
||||
config_name "sqs"
|
||||
milestone 1
|
||||
|
@ -64,6 +66,16 @@ class LogStash::Outputs::SQS < LogStash::Outputs::Base
|
|||
# Name of SQS queue to push messages into. Note that this is just the name of the queue, not the URL or ARN.
|
||||
config :queue, :validate => :string, :required => true
|
||||
|
||||
# Set to true if you want send messages to SQS in batches with batch_send
|
||||
# from the amazon sdk
|
||||
config :batch, :validate => :boolean, :default => true
|
||||
|
||||
# If batch is set to true, the number of events we queue up for a batch_send.
|
||||
config :batch_events, :validate => :number, :default => 10
|
||||
|
||||
# If batch is set to true, the maximum amount of time between batch_send commands when there are pending events to flush.
|
||||
config :batch_timeout, :validate => :number, :default => 5
|
||||
|
||||
public
|
||||
def aws_service_endpoint(region)
|
||||
return {
|
||||
|
@ -77,6 +89,23 @@ class LogStash::Outputs::SQS < LogStash::Outputs::Base
|
|||
|
||||
@sqs = AWS::SQS.new(aws_options_hash)
|
||||
|
||||
if @batch
|
||||
if @batch_events > 10
|
||||
raise RuntimeError.new(
|
||||
"AWS only allows a batch_events parameter of 10 or less"
|
||||
)
|
||||
elsif @batch_events <= 1
|
||||
raise RuntimeError.new(
|
||||
"batch_events parameter must be greater than 1 (or its not a batch)"
|
||||
)
|
||||
end
|
||||
buffer_initialize(
|
||||
:max_items => @batch_events,
|
||||
:max_interval => @batch_timeout,
|
||||
:logger => @logger
|
||||
)
|
||||
end
|
||||
|
||||
begin
|
||||
@logger.debug("Connecting to AWS SQS queue '#{@queue}'...")
|
||||
@sqs_queue = @sqs.queues.named(@queue)
|
||||
|
@ -89,11 +118,21 @@ class LogStash::Outputs::SQS < LogStash::Outputs::Base
|
|||
|
||||
public
|
||||
def receive(event)
|
||||
if @batch
|
||||
buffer_receive(event.to_json)
|
||||
return
|
||||
end
|
||||
@sqs_queue.send_message(event.to_json)
|
||||
end # def receive
|
||||
|
||||
# called from Stud::Buffer#buffer_flush when there are events to flush
|
||||
def flush(events, teardown=false)
|
||||
@sqs_queue.batch_send(events)
|
||||
end
|
||||
|
||||
public
|
||||
def teardown
|
||||
buffer_flush(:final => true)
|
||||
@sqs_queue = nil
|
||||
finished
|
||||
end # def teardown
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue