diff --git a/bin/sendblock.rb b/bin/sendblock.rb new file mode 100644 index 000000000..e8123f683 --- /dev/null +++ b/bin/sendblock.rb @@ -0,0 +1,36 @@ +#!/usr/bin/ruby +# +require 'rubygems' +require "socket" +require "lib/net/message" +require "lib/net/client" +require "lib/net/messages/directory" +require "lib/net/messages/indexevent" +require "lib/net/messages/search" +require "lib/net/messages/searchhits" +require "lib/net/messages/ping" +require "set" +require "thread" + +Thread::abort_on_exception = true + +class SearchClient < LogStash::Net::MessageClient +end + +def main(args) + client = SearchClient.new + msg = LogStash::Net::Messages::PingRequest.new + op = client.sendmsg("logstash-directory", msg) do |response| + puts response + :finished + end + + op.wait_until_finished + puts "Done!" + + return 0 +end + +if $0 == __FILE__ + exit main(ARGV) +end diff --git a/lib/net/socket.rb b/lib/net/socket.rb index 05255399b..4dbba5a4b 100644 --- a/lib/net/socket.rb +++ b/lib/net/socket.rb @@ -6,7 +6,32 @@ require 'uuid' USE_MARSHAL = ENV.has_key?("USE_MARSHAL") + module LogStash; module Net + + class Operation + def initialize(callback) + @mutex = Mutex.new + @callback = callback + @cv = ConditionVariable.new + end # def initialize + + def call(*args) + @mutex.synchronize do + ret = @callback.call(*args) + if ret == :finished + @cv.signal + end + end + end # def call + + def wait_until_finished + @mutex.synchronize do + @cv.wait(@mutex) + end + end # def wait_until_finished + end # def Operation + # The MessageClient class exists only as an alias # to the MessageSocketMux. You should use the # client class if you are implementing a client. @@ -22,6 +47,7 @@ module LogStash; module Net @handler = self @outbuffer = Hash.new { |h,k| h[k] = [] } @mq = nil + @message_operations = Hash.new start_amqp end # def initialize @@ -69,10 +95,16 @@ module LogStash; module Net end name = message.class.name.split(":")[-1] func = "#{name}Handler" - if @handler.respond_to?(func) + + puts "Got message #{message.id}" + if @message_operations.has_key?(message.id) + operation = @message_operations[message.id] + operation.call message + elsif @handler.respond_to?(func) @handler.send(func, message) do |response| reply = message.replyto puts "sending reply to #{reply}" + puts "response id: #{response.id}" sendmsg(reply, response) end @@ -132,6 +164,9 @@ module LogStash; module Net def sendmsg_topic(key, msg) return unless @mq # wait until we are connected + if (msg.is_a?(RequestMessage) and msg.id == nil) + msg.generate_id! + end if USE_MARSHAL data = Marshal.dump(msg) @@ -142,16 +177,23 @@ module LogStash; module Net @mq.topic("amq.topic").publish(data, :key => key) end - def sendmsg(destination, msg) - if msg.is_a?(RequestMessage) + def sendmsg(destination, msg, &callback) + puts "sendmsg; #{msg.id}" + if (msg.is_a?(RequestMessage) and msg.id == nil) msg.generate_id! end msg.replyto = @id @outbuffer[destination] << msg - if @outbuffer[destination].length > MAXBUF flushout(destination) end + + if block_given? + puts "Block given for message #{msg.id}" + op = Operation.new(callback) + @message_operations[msg.id] = op + return op + end end def handler=(handler)