mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge remote-tracking branch 'logstash/master'
This commit is contained in:
commit
7a7bb93793
7 changed files with 121 additions and 71 deletions
2
Rakefile
2
Rakefile
|
@ -2,7 +2,7 @@ require 'tempfile'
|
|||
require 'ftools' # fails in 1.9.2
|
||||
|
||||
# TODO(sissel): load the gemspec and parse the version from it instead.
|
||||
LOGSTASH_VERSION = "1.0.2"
|
||||
LOGSTASH_VERSION = "1.0.4"
|
||||
|
||||
# Compile config grammar (ragel -> ruby)
|
||||
file "lib/logstash/config/grammar.rb" => ["lib/logstash/config/grammar.rl"] do
|
||||
|
|
|
@ -35,8 +35,8 @@ for such things, that works for me, too.)
|
|||
|
||||
logstash releases come in a few flavors.
|
||||
|
||||
* [Monolithic jar](http://semicomplete.com/files/logstash/logstash-1.0.2-monolithic.jar)
|
||||
* [rubygem](https://github.com/downloads/logstash/releases/logstash-1.0.2.gem)
|
||||
* [Monolithic jar](http://semicomplete.com/files/logstash/logstash-1.0.4-monolithic.jar)
|
||||
* [rubygem](https://github.com/downloads/logstash/releases/logstash-1.0.4.gem)
|
||||
* [`gem install logstash`](http://rubygems.org/gems/logstash)
|
||||
|
||||
## What's next?
|
||||
|
|
21
docs/release-engineering.md
Normal file
21
docs/release-engineering.md
Normal file
|
@ -0,0 +1,21 @@
|
|||
---
|
||||
title: Release Engineering - logstash
|
||||
layout: content_right
|
||||
---
|
||||
|
||||
# logstash rel-eng.
|
||||
|
||||
The version patterns for logstash are x.y.z
|
||||
|
||||
* In the same x.y release, no backwards-incompatible changes will be made.
|
||||
* Between x.y.z and x.y.(z+1), deprecations are allowed but should be
|
||||
functional through the next release.
|
||||
* Any backwards-incompatible changes should be well-documented and, if
|
||||
possible, should include tools to help in migrating.
|
||||
* It is OK to add features, plugins, etc, in minor releases as long as they do
|
||||
not break existing functionality.
|
||||
|
||||
I do not suspect the 'x' (currently 1) will change frequently. It should only change
|
||||
if there are major, backwards-incompatible changes made to logstash, and I'm
|
||||
trying to not make those changes, so logstash should forever be at 1.y,z,
|
||||
right? ;)
|
|
@ -1,34 +1,44 @@
|
|||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require "redis"
|
||||
|
||||
# read events from a redis using BLPOP
|
||||
#
|
||||
# For more information about redis, see <http://redis.io/>
|
||||
class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
||||
|
||||
config_name "redis"
|
||||
|
||||
# name is used for logging in case there are multiple instances
|
||||
config :name, :validate => :string, :default => 'default'
|
||||
config :name, :validate => :string, :default => "default"
|
||||
|
||||
config :host, :validate => :string
|
||||
# the hostname of your redis server
|
||||
config :host, :validate => :string, :default => "localhost"
|
||||
|
||||
# the port to connect on (optional)
|
||||
config :port, :validate => :number
|
||||
|
||||
# The redis database number
|
||||
config :db, :validate => :number
|
||||
|
||||
# Timeout of some kind? This isn't really documented in the Redis rubygem
|
||||
# docs...
|
||||
config :timeout, :validate => :number
|
||||
|
||||
# Password to authenticate with
|
||||
config :password, :validate => :password
|
||||
|
||||
# The name of the redis queue (we'll use BLPOP against this)
|
||||
config :queue, :validate => :string, :required => true
|
||||
|
||||
# Maximum number of retries on a read before we give up.
|
||||
config :retries, :validate => :number, :default => 5
|
||||
|
||||
def register
|
||||
require 'redis'
|
||||
@redis = nil
|
||||
end
|
||||
|
||||
def connect
|
||||
require 'redis'
|
||||
Redis.new(
|
||||
:host => @host,
|
||||
:port => @port,
|
||||
|
@ -38,31 +48,28 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
)
|
||||
end
|
||||
|
||||
def run output_queue
|
||||
Thread.new do
|
||||
LogStash::Util::set_thread_name("input|redis|#{@queue}")
|
||||
retries = @retries
|
||||
loop do
|
||||
def run(output_queue)
|
||||
retries = @retries
|
||||
loop do
|
||||
begin
|
||||
@redis ||= connect
|
||||
response = @redis.blpop @queue, 0
|
||||
retries = @retries
|
||||
begin
|
||||
@redis ||= connect
|
||||
response = @redis.blpop @queue, 0
|
||||
retries = @retries
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(response[1]))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "failed to create event with '#{response[1]}'"
|
||||
@logger.error $!
|
||||
end
|
||||
rescue # redis error
|
||||
raise RuntimeError.new "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
retries -= 1
|
||||
sleep 1
|
||||
output_queue << LogStash::Event.new(JSON.parse(response[1]))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "failed to create event with '#{response[1]}'"
|
||||
@logger.error $!
|
||||
end
|
||||
rescue # redis error
|
||||
raise RuntimeError.new "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
retries -= 1
|
||||
sleep 1
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end # loop
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Redis
|
||||
|
|
|
@ -59,39 +59,45 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
private
|
||||
def udp_listener(output_queue)
|
||||
@logger.info("Starting syslog udp listener on #{@host}:#{@port}")
|
||||
s = UDPSocket.new
|
||||
s.bind(@host, @port)
|
||||
server = UDPSocket.new(Socket::AF_INET)
|
||||
server.bind(@host, @port)
|
||||
|
||||
loop do
|
||||
line, client = s.recvfrom(1024)
|
||||
event = LogStash::Event.new({
|
||||
"@message" => line.chomp,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
source_base = URI::Generic.new("syslog", nil, client[3], nil, nil, nil, nil, nil, nil, nil)
|
||||
syslog_relay(event, source)
|
||||
line, client = server.recvfrom(9000)
|
||||
p :client => client
|
||||
p :line => line
|
||||
begin
|
||||
event = LogStash::Event.new({
|
||||
"@message" => line.chomp,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
source = URI::Generic.new("syslog", nil, client[3], nil, nil, nil, nil, nil, nil, nil)
|
||||
syslog_relay(event, source)
|
||||
rescue => e
|
||||
p :exception => e
|
||||
end
|
||||
output_queue << event
|
||||
end
|
||||
ensure
|
||||
if s
|
||||
s.close_read
|
||||
s.close_write
|
||||
if server
|
||||
server.close_read
|
||||
server.close_write
|
||||
end
|
||||
end # def udp_listener
|
||||
|
||||
private
|
||||
def tcp_listener(output_queue)
|
||||
@logger.info("Starting syslog tcp listener on #{@host}:#{@port}")
|
||||
s = TCPServer.new(@host, @port)
|
||||
server = TCPServer.new(@host, @port)
|
||||
|
||||
loop do
|
||||
Thread.new(s.accept) do |s|
|
||||
ip, port = s.peeraddr[3], s.peeraddr[1]
|
||||
Thread.new(server.accept) do |client|
|
||||
ip, port = client.peeraddr[3], client.peeraddr[1]
|
||||
@logger.warn("got connection from #{ip}:#{port}")
|
||||
LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}")
|
||||
source_base = URI::Generic.new("syslog", nil, ip, nil, nil, nil, nil, nil, nil, nil)
|
||||
s.each do |line|
|
||||
client.each do |line|
|
||||
event = LogStash::Event.new({
|
||||
"@message" => line.chomp,
|
||||
"@type" => @type,
|
||||
|
@ -104,7 +110,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
end
|
||||
end
|
||||
ensure
|
||||
s.close if s
|
||||
server.close if server
|
||||
end # def tcp_listener
|
||||
|
||||
# Following RFC3164 where sane, we'll try to parse a received message
|
||||
|
|
|
@ -2,6 +2,9 @@ require "logstash/outputs/base"
|
|||
require "logstash/namespace"
|
||||
require 'eventmachine'
|
||||
|
||||
# send events to a redis databse using RPUSH
|
||||
#
|
||||
# For more information about redis, see <http://redis.io/>
|
||||
class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
||||
|
||||
config_name "redis"
|
||||
|
@ -9,24 +12,35 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
# name is used for logging in case there are multiple instances
|
||||
config :name, :validate => :string, :default => 'default'
|
||||
|
||||
# the hostname of your redis server
|
||||
config :host, :validate => :string
|
||||
|
||||
# the port to connect on (optional)
|
||||
config :port, :validate => :number
|
||||
|
||||
# The redis database number
|
||||
config :db, :validate => :number
|
||||
|
||||
# Timeout of some kind? This isn't really documented in the Redis rubygem
|
||||
# docs...
|
||||
config :timeout, :validate => :number
|
||||
|
||||
# Password to authenticate with
|
||||
config :password, :validate => :password
|
||||
|
||||
# The name of the redis queue (we'll use RPUSH on this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}"
|
||||
config :queue, :validate => :string, :required => true
|
||||
|
||||
# Maximum number of retries on a read before we give up.
|
||||
config :retries, :validate => :number, :default => 5
|
||||
|
||||
def register
|
||||
require 'redis'
|
||||
@redis = nil
|
||||
end
|
||||
end # def register
|
||||
|
||||
def connect
|
||||
require 'redis'
|
||||
Redis.new(
|
||||
:host => @host,
|
||||
:port => @port,
|
||||
|
@ -34,26 +48,28 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
:db => @db,
|
||||
:password => @password
|
||||
)
|
||||
end
|
||||
end # def connect
|
||||
|
||||
def receive event, tries=5
|
||||
if tries > 0
|
||||
begin
|
||||
@redis ||= connect
|
||||
@redis.rpush event.sprintf(@queue), event.to_json
|
||||
rescue
|
||||
@redis = nil
|
||||
@logger.warn "Failed to log #{event.to_s} to redis #{@name}. "+
|
||||
"Will retry #{tries} times."
|
||||
@logger.warn $!
|
||||
Thread.new do
|
||||
sleep 1
|
||||
receive event, tries - 1
|
||||
end
|
||||
end
|
||||
else
|
||||
def receive(event, tries=@retries)
|
||||
if tries <= 0
|
||||
@logger.error "Fatal error, failed to log #{event.to_s} to redis #{@name}"
|
||||
raise RuntimeError.new "Failed to log to redis #{@name}"
|
||||
end
|
||||
end
|
||||
|
||||
begin
|
||||
@redis ||= connect
|
||||
@redis.rpush event.sprintf(@queue), event.to_json
|
||||
rescue
|
||||
# TODO(sissel): Be specific in the exceptions we rescue.
|
||||
# Drop the redis connection to be picked up later during a retry.
|
||||
@redis = nil
|
||||
@logger.warn "Failed to log #{event.to_s} to redis #{@name}. "+
|
||||
"Will retry #{tries} times."
|
||||
@logger.warn $!
|
||||
Thread.new do
|
||||
sleep 1
|
||||
receive event, tries - 1
|
||||
end
|
||||
end
|
||||
end # def receive
|
||||
end
|
||||
|
|
|
@ -16,7 +16,7 @@ Gem::Specification.new do |spec|
|
|||
#rev = %x{svn info}.split("\n").grep(/Revision:/).first.split(" ").last.to_i
|
||||
rev = Time.now.strftime("%Y%m%d%H%M%S")
|
||||
spec.name = "logstash"
|
||||
spec.version = "1.0.2"
|
||||
spec.version = "1.0.4"
|
||||
spec.summary = "logstash - log and event management"
|
||||
spec.description = "scalable log and event management (search, archive, pipeline)"
|
||||
spec.license = "Apache License (2.0)"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue