mirror of
https://github.com/elastic/logstash.git
synced 2025-04-22 13:47:21 -04:00
LOGSTASH-1118 - Option to add event fields from SQS message ID & MD5
This commit is contained in:
parent
279711497a
commit
f3818ede20
1 changed files with 17 additions and 5 deletions
|
@ -56,13 +56,19 @@ require "logstash/plugin_mixins/aws_config"
|
|||
#
|
||||
class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
||||
include LogStash::PluginMixins::AwsConfig
|
||||
|
||||
|
||||
config_name "sqs"
|
||||
plugin_status "experimental"
|
||||
|
||||
# Name of the SQS Queue name to pull messages from. Note that this is just the name of the queue, not the URL or ARN.
|
||||
config :queue, :validate => :string, :required => true
|
||||
|
||||
# Name of the event field in which to store the SQS message ID
|
||||
config :id_field, :validate => :string
|
||||
|
||||
# Name of the event field in which to store the SQS message MD5 checksum
|
||||
config :md5_field, :validate => :string
|
||||
|
||||
public
|
||||
def aws_service_endpoint(region)
|
||||
return {
|
||||
|
@ -95,10 +101,10 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
|||
public
|
||||
def run(output_queue)
|
||||
@logger.debug("Polling SQS queue", :queue => @queue)
|
||||
|
||||
|
||||
receive_opts = {
|
||||
:limit => 10,
|
||||
:visibility_timeout => 30
|
||||
:limit => 10,
|
||||
:visibility_timeout => 30
|
||||
}
|
||||
|
||||
continue_polling = true
|
||||
|
@ -108,6 +114,12 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
|||
if message
|
||||
e = to_event(message.body, @sqs_queue)
|
||||
if e
|
||||
if @id_field
|
||||
e[@id_field] = message.id
|
||||
end
|
||||
if @md5_field
|
||||
e[@md5_field] = message.md5
|
||||
end
|
||||
@logger.debug("Processed SQS message", :message_id => message.id, :message_md5 => message.md5, :queue => @queue)
|
||||
output_queue << e
|
||||
message.delete
|
||||
|
@ -135,7 +147,7 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
|||
@logger.error("AWS::EC2::Errors::RequestLimitExceeded ... failed.", :queue => @queue)
|
||||
return false
|
||||
end # retry limit exceeded
|
||||
|
||||
|
||||
begin
|
||||
block.call
|
||||
rescue AWS::EC2::Errors::RequestLimitExceeded
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue