mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
LOGSTASH-1118 - Option to add event fields from SQS message ID & MD5
This commit is contained in:
parent
279711497a
commit
f3818ede20
1 changed files with 17 additions and 5 deletions
|
@ -56,13 +56,19 @@ require "logstash/plugin_mixins/aws_config"
|
||||||
#
|
#
|
||||||
class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
||||||
include LogStash::PluginMixins::AwsConfig
|
include LogStash::PluginMixins::AwsConfig
|
||||||
|
|
||||||
config_name "sqs"
|
config_name "sqs"
|
||||||
plugin_status "experimental"
|
plugin_status "experimental"
|
||||||
|
|
||||||
# Name of the SQS Queue name to pull messages from. Note that this is just the name of the queue, not the URL or ARN.
|
# Name of the SQS Queue name to pull messages from. Note that this is just the name of the queue, not the URL or ARN.
|
||||||
config :queue, :validate => :string, :required => true
|
config :queue, :validate => :string, :required => true
|
||||||
|
|
||||||
|
# Name of the event field in which to store the SQS message ID
|
||||||
|
config :id_field, :validate => :string
|
||||||
|
|
||||||
|
# Name of the event field in which to store the SQS message MD5 checksum
|
||||||
|
config :md5_field, :validate => :string
|
||||||
|
|
||||||
public
|
public
|
||||||
def aws_service_endpoint(region)
|
def aws_service_endpoint(region)
|
||||||
return {
|
return {
|
||||||
|
@ -95,10 +101,10 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
||||||
public
|
public
|
||||||
def run(output_queue)
|
def run(output_queue)
|
||||||
@logger.debug("Polling SQS queue", :queue => @queue)
|
@logger.debug("Polling SQS queue", :queue => @queue)
|
||||||
|
|
||||||
receive_opts = {
|
receive_opts = {
|
||||||
:limit => 10,
|
:limit => 10,
|
||||||
:visibility_timeout => 30
|
:visibility_timeout => 30
|
||||||
}
|
}
|
||||||
|
|
||||||
continue_polling = true
|
continue_polling = true
|
||||||
|
@ -108,6 +114,12 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
||||||
if message
|
if message
|
||||||
e = to_event(message.body, @sqs_queue)
|
e = to_event(message.body, @sqs_queue)
|
||||||
if e
|
if e
|
||||||
|
if @id_field
|
||||||
|
e[@id_field] = message.id
|
||||||
|
end
|
||||||
|
if @md5_field
|
||||||
|
e[@md5_field] = message.md5
|
||||||
|
end
|
||||||
@logger.debug("Processed SQS message", :message_id => message.id, :message_md5 => message.md5, :queue => @queue)
|
@logger.debug("Processed SQS message", :message_id => message.id, :message_md5 => message.md5, :queue => @queue)
|
||||||
output_queue << e
|
output_queue << e
|
||||||
message.delete
|
message.delete
|
||||||
|
@ -135,7 +147,7 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
|
||||||
@logger.error("AWS::EC2::Errors::RequestLimitExceeded ... failed.", :queue => @queue)
|
@logger.error("AWS::EC2::Errors::RequestLimitExceeded ... failed.", :queue => @queue)
|
||||||
return false
|
return false
|
||||||
end # retry limit exceeded
|
end # retry limit exceeded
|
||||||
|
|
||||||
begin
|
begin
|
||||||
block.call
|
block.call
|
||||||
rescue AWS::EC2::Errors::RequestLimitExceeded
|
rescue AWS::EC2::Errors::RequestLimitExceeded
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue