mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
- Support yielding to blocks for client and server #run methods
- Start hacking on having client writes block if we are far behind on getting message acks (w/ exponential backoff).
This commit is contained in:
parent
173840a827
commit
42df5ba36b
5 changed files with 24 additions and 18 deletions
30
bin/agent.rb
30
bin/agent.rb
|
@ -14,6 +14,9 @@ class Agent < LogStash::Net::MessageClient
|
|||
@host = host
|
||||
@port = port
|
||||
@watcher = nil
|
||||
|
||||
# TODO(sissel): This should go into the network code
|
||||
@needack = Hash.new
|
||||
start_log_watcher
|
||||
end # def initialize
|
||||
|
||||
|
@ -39,25 +42,22 @@ class Agent < LogStash::Net::MessageClient
|
|||
ier.log_data = string
|
||||
ier.metadata["source_host"] = @hostname
|
||||
|
||||
sent = false
|
||||
while !sent
|
||||
begin
|
||||
$stdout.write(".")
|
||||
$stdout.flush
|
||||
#puts "Trying to send: #{ier.inspect}"
|
||||
@connection.sendmsg(ier)
|
||||
sent = true
|
||||
rescue LogStash::Net::NoSocket
|
||||
# No client connection available, wait.
|
||||
puts "No client connection available to send on, sleeping..."
|
||||
sleep 1
|
||||
end
|
||||
#$stdout.write(".")
|
||||
$stdout.flush
|
||||
@connection.sendmsg(ier)
|
||||
@needack[ier.id] = ier
|
||||
|
||||
sleeptime = 0.1
|
||||
while @needack.length > 500
|
||||
sleeptime = [sleeptime * 2, 5].min
|
||||
$stderr.puts "Waiting for acks (#{sleeptime})... #{@needack.length}"
|
||||
sleep(sleeptime)
|
||||
end
|
||||
|
||||
end # def index
|
||||
|
||||
def IndexEventResponseHandler(msg)
|
||||
if msg.success?
|
||||
end
|
||||
@needack.delete(msg.id)
|
||||
end # def IndexEventResponseHandler
|
||||
end
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ module LogStash; module Net
|
|||
def run
|
||||
EventMachine.run do
|
||||
connect(@host, @port)
|
||||
yield self if block_given?
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ module LogStash; module Net
|
|||
def run
|
||||
EventMachine.run do
|
||||
listen(@host, @port)
|
||||
yield self if block_given?
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ module LogStash; module Net; module Servers
|
|||
@indexes = Hash.new
|
||||
@lines = Hash.new { |h,k| h[k] = 0 }
|
||||
@indexcount = 0
|
||||
@starttime = Time.now
|
||||
end
|
||||
|
||||
def IndexEventRequestHandler(request)
|
||||
|
@ -28,8 +29,10 @@ module LogStash; module Net; module Servers
|
|||
response.id = request.id
|
||||
@indexcount += 1
|
||||
|
||||
print "\rK#{@indexcount} (vs #{request.id})"
|
||||
#puts "#{@indexcount} (id: #{request.id})"
|
||||
if @indexcount % 100 == 0
|
||||
duration = (Time.now.to_f - @starttime.to_f)
|
||||
puts "%.2f" % (@indexcount / duration)
|
||||
end
|
||||
|
||||
log_type = request.log_type
|
||||
entry = $logs[log_type].parse_entry(request.log_data)
|
||||
|
|
|
@ -40,7 +40,7 @@ module LogStash; module Net
|
|||
end
|
||||
EventMachine.defer(operation, nil)
|
||||
#@handler.send(func, msg) do |response|
|
||||
#sendmsg(response)
|
||||
#sendmsg(response)
|
||||
#end
|
||||
else
|
||||
$stderr.puts "#{@handler.class.name} does not support #{func}"
|
||||
|
@ -50,6 +50,7 @@ module LogStash; module Net
|
|||
if len > 0
|
||||
puts "Removing #{len} bytes (#{count} packets)"
|
||||
@buffer[0 .. len - 1] = ""
|
||||
sleep 1
|
||||
end
|
||||
end # def receive_data
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue