mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- Add #searchhits to the search client. Takes a log type and array of queries,
returns a hash of query => hitcount - Fix LogStash::Operation behavior. If 'wait_until_finish' was called after the operation finished, not before, we would deadlock. Now any wait_until_finish call will succeed and return immediately if the operation has already finished. It will still block normally if the operation has not finished. - Comment-out the sliding window stuff - Add 'graphpoints' action for the web. Querying this will return an array of [timestamp_in_ms, hits] for the query for some period of timestamps for your query. Makes happy use of the Operation class so we can send a pile of search requests in parallel and wait until they finish. 24 queries (one for every hour) takes less than a second.
This commit is contained in:
parent
8b826e5215
commit
728221d326
4 changed files with 109 additions and 12 deletions
|
@ -106,5 +106,52 @@ module LogStash::Net::Clients
|
|||
|
||||
return [hits, results]
|
||||
end
|
||||
|
||||
def searchhits(log_type, queries)
|
||||
if !queries.is_a?(Array)
|
||||
queries = [queries]
|
||||
end
|
||||
|
||||
|
||||
hits = Hash.new { |h,k| h[k] = 0 }
|
||||
ops = []
|
||||
|
||||
queries.each do |query|
|
||||
options = {
|
||||
:query => query,
|
||||
:log_type => log_type,
|
||||
}
|
||||
|
||||
# Also skip things that need parsing when searching, by default.
|
||||
if !query.include?("@NEEDSPARSING")
|
||||
realquery = "(#{query}) AND -@NEEDSPARSING:1"
|
||||
else
|
||||
realquery = query
|
||||
end
|
||||
|
||||
@logger.info "Query: #{realquery}"
|
||||
|
||||
hits_msg = LogStash::Net::Messages::SearchHitsRequest.new
|
||||
hits_msg.log_type = options[:log_type]
|
||||
hits_msg.query = realquery
|
||||
@indexers.each do |i|
|
||||
ops << sendmsg(i, hits_msg) do |msg|
|
||||
@logger.debug "Got #{msg.class} with age #{msg.age} (query: #{query})"
|
||||
hits[query] += msg.hits
|
||||
@logger.debug "Hits: #{msg.hits}"
|
||||
:finished
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
remaining = ops.length
|
||||
ops.each do |op|
|
||||
op.wait_until_finished
|
||||
remaining -=1
|
||||
@logger.debug "Waiting for #{remaining} operations"
|
||||
end
|
||||
|
||||
return hits
|
||||
end
|
||||
end; end # class LogStash::Net::Clients::Search
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ module LogStash; module Net; module Servers
|
|||
begin
|
||||
reader, search, qp = get_ferret(request.log_type)
|
||||
rescue Ferret::FileNotFoundError => e
|
||||
$logger.warn "get_ferret failed: #{e.inspect}"
|
||||
@logger.warn "get_ferret failed: #{e.inspect}"
|
||||
response.hits = 0
|
||||
yield response
|
||||
return
|
||||
|
@ -194,8 +194,8 @@ module LogStash; module Net; module Servers
|
|||
offset = (request.offset or 0)
|
||||
|
||||
# search_each returns number of hits, even if we don't yield them.
|
||||
hits = search.search_each(query, :limit => 1, :offset => offset,
|
||||
:sort => "@DATE") { |docid, score| }
|
||||
hits = search.search_each(query, :limit => 1, :offset => offset) { |docid, score| }
|
||||
|
||||
response.hits = hits
|
||||
yield response
|
||||
end # def SearchHitsRequestHandler
|
||||
|
|
|
@ -34,22 +34,30 @@ module LogStash; module Net
|
|||
@mutex = Mutex.new
|
||||
@callback = callback
|
||||
@cv = ConditionVariable.new
|
||||
@finished = false
|
||||
end # def initialize
|
||||
|
||||
def call(*args)
|
||||
@mutex.synchronize do
|
||||
ret = @callback.call(*args)
|
||||
if ret == :finished
|
||||
@finished = true
|
||||
@cv.signal
|
||||
else
|
||||
return ret
|
||||
end
|
||||
end
|
||||
end # def call
|
||||
|
||||
def wait_until_finished
|
||||
@mutex.synchronize do
|
||||
@cv.wait(@mutex)
|
||||
@cv.wait(@mutex) if !finished?
|
||||
end
|
||||
end # def wait_until_finished
|
||||
|
||||
def finished?
|
||||
return @finished
|
||||
end
|
||||
end # def Operation
|
||||
|
||||
# TODO: document this class
|
||||
|
@ -66,7 +74,7 @@ module LogStash; module Net
|
|||
@handler = self
|
||||
@receive_queue = Queue.new
|
||||
@outbuffer = Hash.new { |h,k| h[k] = [] }
|
||||
@slidingwindow = LogStash::SlidingWindowSet.new
|
||||
#@slidingwindow = LogStash::SlidingWindowSet.new
|
||||
@mq = nil
|
||||
@message_operations = Hash.new
|
||||
@startuplock = Mutex.new
|
||||
|
@ -139,10 +147,10 @@ module LogStash; module Net
|
|||
|
||||
obj.each do |item|
|
||||
message = Message.new_from_data(item)
|
||||
if @slidingwindow.include?(message.id)
|
||||
puts "Removing ack for #{message.id}"
|
||||
@slidingwindow.delete(message.id)
|
||||
end
|
||||
#if @slidingwindow.include?(message.id)
|
||||
#puts "Removing ack for #{message.id}"
|
||||
#@slidingwindow.delete(message.id)
|
||||
#end
|
||||
name = message.class.name.split(":")[-1]
|
||||
func = "#{name}Handler"
|
||||
|
||||
|
@ -224,8 +232,8 @@ module LogStash; module Net
|
|||
msg.replyto = @id
|
||||
|
||||
if (msg.is_a?(RequestMessage) and !msg.is_a?(ResponseMessage))
|
||||
$logger.info "Tracking #{msg.class.name}##{msg.id}"
|
||||
@slidingwindow << msg.id
|
||||
@logger.info "Tracking #{msg.class.name}##{msg.id}"
|
||||
#@slidingwindow << msg.id
|
||||
end
|
||||
|
||||
if msg.buffer?
|
||||
|
|
|
@ -14,7 +14,6 @@ class Search < Application
|
|||
params[:limit] = (params[:limit] ? params[:limit].to_i : 100) rescue 100
|
||||
params[:log_type] = (params[:log_type] or "linux-syslog")
|
||||
|
||||
#@search = LogStash::Net::Clients::Search.new("/opt/logstash/etc/logstashd.yaml")
|
||||
params[:query] = params[:q]
|
||||
|
||||
Timeout.timeout(10) do
|
||||
|
@ -22,4 +21,47 @@ class Search < Application
|
|||
render
|
||||
end
|
||||
end
|
||||
|
||||
def graphpoints
|
||||
provides :json
|
||||
params[:log_type] = (params[:log_type] or "linux-syslog")
|
||||
orig_query = params[:q]
|
||||
|
||||
day = 60 * 60 * 24
|
||||
hour = 60 * 60
|
||||
|
||||
starttime = (Time.now - day).to_i + Time.now.gmt_offset
|
||||
starttime = starttime - (starttime % hour)
|
||||
increment = 60 * 60
|
||||
curtime = starttime
|
||||
@points = []
|
||||
# correct for timezone date offset
|
||||
Timeout.timeout(20) do
|
||||
queries = {}
|
||||
while starttime + day > curtime
|
||||
endtime = curtime + increment - 1
|
||||
querygen = "@DATE:[#{curtime} #{endtime}] AND (#{orig_query})"
|
||||
puts "Query: #{querygen}"
|
||||
queries[querygen] = {
|
||||
:time => curtime,
|
||||
:query => querygen,
|
||||
}
|
||||
curtime += increment
|
||||
end
|
||||
|
||||
hits = $search.searchhits(params[:log_type], queries.keys)
|
||||
#puts queries.inspect
|
||||
hits.each do |key,count|
|
||||
#puts "query: #{queries.has_key?(key)} / #{key} "
|
||||
queries[key][:hits] = count
|
||||
end
|
||||
|
||||
@data = Hash.new
|
||||
queries.each do |query, entry|
|
||||
@data[entry[:time].to_i * 1000] = entry[:hits]
|
||||
end
|
||||
@data = @data.to_a
|
||||
render
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue