- BaseConfig subclasses MQRPC::Config now

- Update servers and clients to use MQRPC properly
This commit is contained in:
Jordan Sissel 2009-11-06 08:57:20 +00:00
parent 790b081a31
commit c9fa9c87af
4 changed files with 40 additions and 52 deletions

View file

@ -1,22 +1,18 @@
require 'rubygems'
require 'mqrpc'
require 'yaml'
module LogStash; module Config
# Base config class. All configs need to know how to get to a broker.
class BaseConfig
attr_reader :mqhost
attr_reader :mqport
attr_reader :mquser
attr_reader :mqpass
attr_reader :mqvhost
class BaseConfig < MQRPC::Config
def initialize(file)
obj = YAML::load(File.open(file).read())
@mqhost = obj["mqhost"] || "localhost"
@mqport = obj["mqport"] || 5672
@mquser = obj["mquser"] || "guest"
@mqpass = obj["mqpass"] || "guest"
@mqvhost = obj["mqvhost"] || "/"
@mqexchange = "logstash.topic"
end # def initialize
end # class BaseConfig
end; end # module LogStash::Config

View file

@ -1,14 +1,15 @@
require 'lib/config/agent'
require 'lib/net/client'
require 'lib/net/common'
require 'lib/net/messages/indexevent'
require 'lib/file/tail/since'
require 'socket'
module LogStash; module Net; module Clients
class Agent < LogStash::Net::MessageClient
class Agent < MQRPC::Agent
def initialize(configfile, logger)
@config = LogStash::Config::AgentConfig.new(configfile)
super(@config, nil)
MQRPC::logger = logger
super(@config)
@hostname = Socket.gethostname
@msgs = []
@logger = logger

View file

@ -1,23 +1,24 @@
require 'rubygems'
require 'lib/net/server'
require 'lib/net/message'
require 'ferret'
require 'lib/config/indexer.rb'
require 'lib/log/text'
require 'lib/net/messages/broadcast'
require 'lib/net/messages/directory'
require 'lib/net/messages/indexevent'
require 'lib/net/messages/logkeys'
require 'lib/net/messages/logtypes'
require 'lib/net/messages/ping'
require 'lib/net/messages/search'
require 'lib/net/messages/searchhits'
require 'lib/net/messages/ping'
require 'lib/config/indexer.rb'
require 'lib/util'
require 'ferret'
require 'lib/log/text'
require 'mqrpc'
require 'mqrpc/functions/ping'
require 'pp'
module LogStash; module Net; module Servers
class Indexer < LogStash::Net::MessageServer
class Indexer < MQRPC::Agent
include MQRPC::Functions::Ping
# Default broadcast interval is 30, but for debugging/testing, sometimes
# it's nice to set it lower.
BROADCAST_INTERVAL = (ENV["BROADCAST_INTERVAL"] or 30).to_i
@ -28,7 +29,8 @@ module LogStash; module Net; module Servers
@logger.progname = "indexer"
@config = LogStash::Config::IndexerConfig.new(configfile)
super(@config, @logger)
MQRPC::logger = @logger
super(@config)
@indexes = Hash.new
@lines = Hash.new { |h,k| h[k] = 0 }
@indexcount = 0
@ -41,8 +43,7 @@ module LogStash; module Net; module Servers
end
def IndexEventRequestHandler(request)
response = LogStash::Net::Messages::IndexEventResponse.new
response.id = request.id
response = LogStash::Net::Messages::IndexEventResponse.new(request)
@indexcount += 1
if @indexcount % 100 == 0
@ -54,20 +55,12 @@ module LogStash; module Net; module Servers
@indexes[log_type] ||= @config.logs[log_type].get_index
@indexes[log_type] << request.log_data
end
def PingRequestHandler(request)
@logger.debug "received PingRequest (#{request.pingdata})"
response = LogStash::Net::Messages::PingResponse.new
response.id = request.id
response.pingdata = request.pingdata
yield response
end
def LogTypesRequestHandler(request)
@logger.debug "received LogTypesRequest"
response = LogStash::Net::Messages::LogTypesResponse.new
response.id = request.id
response = LogStash::Net::Messages::LogTypesResponse.new(request)
response.types = @config.logs.types
yield response
end
@ -75,8 +68,7 @@ module LogStash; module Net; module Servers
def LogKeysRequestHandler(request)
@logger.debug "received LogKeysRequest"
reader, search, qp = get_ferret(request.log_type)
response = LogStash::Net::Messages::LogKeysResponse.new
response.id = request.id
response = LogStash::Net::Messages::LogKeysResponse.new(request)
response.keys = reader.fields
response.log_type = request.log_type
yield response
@ -100,8 +92,7 @@ module LogStash; module Net; module Servers
"#{request.log_type})"
@logger.debug "message age is #{request.age} seconds"
stopwatch = LogStash::StopWatch.new
response = LogStash::Net::Messages::SearchResponse.new
response.id = request.id
response = LogStash::Net::Messages::SearchResponse.new(request)
response.indexer_id = @id
if @config.logs[request.log_type].nil?
@ -154,8 +145,7 @@ module LogStash; module Net; module Servers
if (total and count < limit)
done = true
end
part_response = LogStash::Net::Messages::SearchResponse.new
part_response.id = request.id
part_response = LogStash::Net::Messages::SearchResponse.new(request)
part_response.results = results
part_response.finished = false
yield part_response
@ -172,8 +162,7 @@ module LogStash; module Net; module Servers
def SearchHitsRequestHandler(request)
@logger.debug "received SearchHitsRequest (#{request.query.inspect} in " \
"#{request.log_type})"
response = LogStash::Net::Messages::SearchHitsResponse.new
response.id = request.id
response = LogStash::Net::Messages::SearchHitsResponse.new(request)
if @config.logs[request.log_type].nil?
@logger.warn "SearchHitsRequest received for invalid log_type: " \
"#{request.log_type}"
@ -209,8 +198,7 @@ module LogStash; module Net; module Servers
def DirectoryRequestHandler(request)
@logger.debug "received DirectoryRequest"
response = LogStash::Net::Messages::DirectoryResponse.new
response.id = request.id
response = LogStash::Net::Messages::DirectoryResponse.new(request)
response.indexers = @indexers.keys
yield response
end

View file

@ -1,25 +1,25 @@
require 'rubygems'
require 'lib/net/server'
require 'lib/net/message'
require 'ferret'
require 'lib/config/indexer.rb'
require 'lib/log/text'
require 'lib/net/messages/indexevent'
require 'lib/net/messages/ping'
require 'lib/net/messages/search'
require 'lib/net/messages/searchhits'
require 'lib/net/messages/ping'
require 'lib/config/indexer.rb'
require 'ferret'
require 'lib/log/text'
require 'mqrpc'
require 'pp'
module LogStash; module Net; module Servers
class Parser < LogStash::Net::MessageServer
class Parser < MQRPC::Agent
SYNCDELAY = 10
def initialize(configfile, logger)
@config = LogStash::Config::IndexerConfig.new(configfile)
@logger = logger
@logger.progname = "parser"
super(@config, @logger)
MQRPC::logger = @logger
super(@config)
@lines = Hash.new { |h,k| h[k] = 0 }
@indexcount = 0
@starttime = Time.now
@ -28,8 +28,7 @@ module LogStash; module Net; module Servers
def IndexEventRequestHandler(request)
@logger.debug "received IndexEventRequest (for type " \
"#{request.log_type}): #{request.log_data}"
response = LogStash::Net::Messages::IndexEventResponse.new
response.id = request.id
response = LogStash::Net::Messages::IndexEventResponse.new(request)
@indexcount += 1
if @indexcount % 100 == 0
@ -58,10 +57,14 @@ module LogStash; module Net; module Servers
sendmsg("logstash-index", request)
end
def IndexEventResponseHandler(response)
# This message comes from the indexer, we don't need to really
# do anything with it.
end
def PingRequestHandler(request)
@logger.debug "received PingRequest (#{request.pingdata})"
response = LogStash::Net::Messages::PingResponse.new
response.id = request.id
response = LogStash::Net::Messages::PingResponse.new(request)
response.pingdata = request.pingdata
yield response
end