get pushed into a queue that is managed by a separate thread. This is
necessary to prevent sendmsg() from blocking the main AMQP reader thread when
sendmsg() might block due to sliding window closure.
This queue length is unchecked, however, the correct fix is to unsubscribe
from the input (the AMQP queue) and only resubscribe once our queue has
cleared a bit.
returns a hash of query => hitcount
- Fix LogStash::Operation behavior. If 'wait_until_finish' was called after the
operation finished, not before, we would deadlock. Now any wait_until_finish
call will succeed and return immediately if the operation has already finished.
It will still block normally if the operation has not finished.
- Comment-out the sliding window stuff
- Add 'graphpoints' action for the web. Querying this will return an array of
[timestamp_in_ms, hits] for the query for some period of timestamps for your
query. Makes happy use of the Operation class so we can send a pile of search
requests in parallel and wait until they finish. 24 queries (one for every
hour) takes less than a second.
push into an array which would get flushed when full or once per second.
This was causing any normally-fast search request to take a minimum of 1 second.
The original need for buffering was due to features of STOMP (not used
anymore) that would limit the number of messages we could handle at once. We
would buffer multiple messages to a single STOMP queue message so that we
could handle multiple messages. This may not be a problem anymore with AMQP.
Set BUFFER_DELAY_MESSAGES to true in socket.rb if you want to re-enable this.
- Add 'timestamp' attribute for all messages. This attribute is set
automatically when it is sent with MessageSocket#sendmsg.
- Add blocking on MessageSocket#initialize to wait for the AMQP thread. Uses
the same mutex/conditionvariable method we use to block LogStash::Operation
instances.
- Removed old 'USE_MARSHAL' support as I think we're pretty happy with JSON now.
op = client.sendmsg("logstash-directory", msg) do |response|
puts response
:finished
end
op.wait_until_finished
puts "Done!"
If Socket#sendmsg is given a block, it will return an Operation that is
useful shown above.
- Things wanting to block until amqp/em are finished will still call
Socket#run, which now only blocks until the new amqp/em thread complete (with
Thread#join)