mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- factor out search client
- make web and bin/search.rb use the new search client class
This commit is contained in:
parent
15b830abc4
commit
a35abbbdad
4 changed files with 99 additions and 127 deletions
|
@ -1,98 +1,22 @@
|
|||
#!/usr/bin/ruby
|
||||
#
|
||||
require "rubygems"
|
||||
require "socket"
|
||||
require "lib/net/message"
|
||||
require "lib/net/client"
|
||||
require "lib/net/clients/search"
|
||||
require "lib/config/base"
|
||||
require "lib/net/messages/directory"
|
||||
require "lib/net/messages/indexevent"
|
||||
require "lib/net/messages/search"
|
||||
require "lib/net/messages/searchhits"
|
||||
require "lib/net/messages/ping"
|
||||
require "set"
|
||||
|
||||
Thread::abort_on_exception = true
|
||||
|
||||
$done = false
|
||||
$lastid = nil
|
||||
$count = 0
|
||||
$time = 0
|
||||
$start = Time.now.to_f
|
||||
|
||||
class SearchClient < LogStash::Net::MessageClient
|
||||
attr_accessor :indexers
|
||||
attr_reader :hits
|
||||
attr_reader :responding
|
||||
attr_reader :results
|
||||
|
||||
def initialize(config_file)
|
||||
@indexers = Array.new
|
||||
@responding = Array.new
|
||||
@hits = 0
|
||||
@results = []
|
||||
@result_mutex = Mutex.new
|
||||
config = LogStash::Config::BaseConfig.new(config_file)
|
||||
super(config, "search")
|
||||
start
|
||||
end
|
||||
|
||||
def start
|
||||
# find our indexers
|
||||
msg = LogStash::Net::Messages::DirectoryRequest.new
|
||||
op = sendmsg("logstash-directory", msg) do |response|
|
||||
DirectoryResponseHandler(response)
|
||||
:finished
|
||||
end
|
||||
|
||||
op.wait_until_finished
|
||||
end
|
||||
|
||||
def SearchResponseHandler(msg)
|
||||
@result_mutex.synchronize do
|
||||
msg.results.each do |result|
|
||||
@results << result
|
||||
end
|
||||
if msg.finished
|
||||
@responding << msg.indexer_id
|
||||
if @responding.length == @indexers.length
|
||||
close
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def SearchHitsResponseHandler(msg)
|
||||
@result_mutex.synchronize do
|
||||
@hits += msg.hits
|
||||
end
|
||||
end
|
||||
|
||||
def DirectoryResponseHandler(msg)
|
||||
@indexers = msg.indexers
|
||||
end
|
||||
|
||||
def search(log_type, query)
|
||||
hits_msg = LogStash::Net::Messages::SearchHitsRequest.new
|
||||
hits_msg.log_type = log_type
|
||||
hits_msg.query = query
|
||||
search_msg = LogStash::Net::Messages::SearchRequest.new
|
||||
search_msg.log_type = log_type
|
||||
search_msg.query = query
|
||||
search_msg.limit = 100
|
||||
@indexers.each do |i|
|
||||
sendmsg(i, hits_msg)
|
||||
sendmsg(i, search_msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def main(args)
|
||||
if ARGV.length != 3
|
||||
$stderr.puts "Usage: search configfile log_type query"
|
||||
end
|
||||
client = SearchClient.new(args[0])
|
||||
client.search(args[1], args[2])
|
||||
client = LogStash::Net::Clients::Search.new(args[0])
|
||||
client.search({
|
||||
:log_type => args[1],
|
||||
:query => args[2],
|
||||
:limit => 100,
|
||||
})
|
||||
|
||||
# Wait for the client to decide it's done.
|
||||
client.run
|
||||
|
|
83
lib/net/clients/search.rb
Normal file
83
lib/net/clients/search.rb
Normal file
|
@ -0,0 +1,83 @@
|
|||
#!/usr/bin/ruby
|
||||
#
|
||||
require "rubygems"
|
||||
require "lib/net/message"
|
||||
require "lib/net/client"
|
||||
require "lib/config/base"
|
||||
require "lib/net/messages/directory"
|
||||
require "lib/net/messages/indexevent"
|
||||
require "lib/net/messages/search"
|
||||
require "lib/net/messages/searchhits"
|
||||
require "lib/net/messages/ping"
|
||||
require "set"
|
||||
|
||||
|
||||
module LogStash::Net::Clients
|
||||
class Search < LogStash::Net::MessageClient
|
||||
attr_accessor :indexers
|
||||
attr_reader :hits
|
||||
attr_reader :responding
|
||||
attr_reader :results
|
||||
|
||||
def initialize(config_file)
|
||||
@indexers = Array.new
|
||||
@responding = Array.new
|
||||
@hits = 0
|
||||
@results = []
|
||||
@result_mutex = Mutex.new
|
||||
config = LogStash::Config::BaseConfig.new(config_file)
|
||||
super(config, "search")
|
||||
start
|
||||
end
|
||||
|
||||
def start
|
||||
# find our indexers
|
||||
msg = LogStash::Net::Messages::DirectoryRequest.new
|
||||
op = sendmsg("logstash-directory", msg) do |response|
|
||||
DirectoryResponseHandler(response)
|
||||
:finished
|
||||
end
|
||||
|
||||
op.wait_until_finished
|
||||
end
|
||||
|
||||
def SearchResponseHandler(msg)
|
||||
@result_mutex.synchronize do
|
||||
msg.results.each do |result|
|
||||
@results << result
|
||||
end
|
||||
if msg.finished
|
||||
@responding << msg.indexer_id
|
||||
if @responding.length == @indexers.length
|
||||
close
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def SearchHitsResponseHandler(msg)
|
||||
@result_mutex.synchronize do
|
||||
@hits += msg.hits
|
||||
end
|
||||
end
|
||||
|
||||
def DirectoryResponseHandler(msg)
|
||||
@indexers = msg.indexers
|
||||
end
|
||||
|
||||
def search(options)
|
||||
hits_msg = LogStash::Net::Messages::SearchHitsRequest.new
|
||||
hits_msg.log_type = options[:log_type]
|
||||
hits_msg.query = options[:query]
|
||||
search_msg = LogStash::Net::Messages::SearchRequest.new
|
||||
search_msg.log_type = options[:log_type]
|
||||
search_msg.query = options[:query]
|
||||
search_msg.limit = options[:limit]
|
||||
search_msg.offset = options[:offset]
|
||||
@indexers.each do |i|
|
||||
sendmsg(i, hits_msg)
|
||||
sendmsg(i, search_msg)
|
||||
end
|
||||
end
|
||||
end; end # class LogStash::Net::Clients::Search
|
||||
|
|
@ -1,34 +1,8 @@
|
|||
|
||||
$: << ".."
|
||||
require "lib/net/client"
|
||||
require "lib/net/messages/search"
|
||||
require "lib/net/messages/searchhits"
|
||||
require "lib/net/messages/ping"
|
||||
require "lib/net/clients/search"
|
||||
require "timeout"
|
||||
|
||||
class SearchClient < LogStash::Net::MessageClient
|
||||
attr_reader :results
|
||||
attr_reader :hits
|
||||
|
||||
def SearchHitsResponseHandler(msg)
|
||||
@hits = msg.hits
|
||||
end
|
||||
|
||||
def SearchResponseHandler(msg)
|
||||
if @results == nil
|
||||
@results = []
|
||||
end
|
||||
|
||||
msg.results.each do |result|
|
||||
@results << result
|
||||
end
|
||||
|
||||
if msg.finished
|
||||
close
|
||||
end
|
||||
end # def SearchResponseHandler
|
||||
end # class SearchClient
|
||||
|
||||
class Search < Application
|
||||
|
||||
def index
|
||||
|
@ -39,23 +13,14 @@ class Search < Application
|
|||
params[:offset] = (params[:offset] ? params[:offset].to_i : 0) rescue 0
|
||||
params[:limit] = (params[:limit] ? params[:limit].to_i : 20) rescue 20
|
||||
params[:log_type] = (params[:log_type] or "linux-syslog")
|
||||
@searchclient = SearchClient.new(host="localhost", port=61613)
|
||||
msg = LogStash::Net::Messages::SearchHitsRequest.new
|
||||
msg.log_type = params[:log_type]
|
||||
msg.query = params[:q]
|
||||
@searchclient.sendmsg("/queue/logstash-index", msg)
|
||||
|
||||
msg = LogStash::Net::Messages::SearchRequest.new
|
||||
msg.log_type = (params[:log_type] or "linux-syslog")
|
||||
msg.query = params[:q]
|
||||
msg.offset = params[:offset]
|
||||
msg.limit = params[:limit]
|
||||
@searchclient.sendmsg("/queue/logstash-index", msg)
|
||||
@search = LogStash::Net::Clients::Search.new("/home/jls/projects/logstash/logstashd.yaml")
|
||||
params[:query] = params[:q]
|
||||
@search.search(params)
|
||||
|
||||
Timeout.timeout(10) do
|
||||
@searchclient.run
|
||||
@search.run
|
||||
render
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -8,14 +8,14 @@
|
|||
<% end =%>
|
||||
<hr>
|
||||
|
||||
<h4>Results <%= params[:offset] %> - <%= params[:offset] + params[:limit] %> of <%= @searchclient.hits %> for <%=h params[:q] %></h4>
|
||||
<h4>Results <%= params[:offset] %> - <%= params[:offset] + params[:limit] %> of <%= @search.hits %> for <%=h params[:q] %></h4>
|
||||
<% if params[:offset] > 0 %>
|
||||
<%= link_to "prev", url(:controller => "search", :action => "query", :q => params[:q], :offset => [0, params[:offset] - params[:limit]].max, :limit => params[:limit]), :log_type => params[:log_type] %>
|
||||
<% end %>
|
||||
<% if params[:offset] + params[:limit] < @searchclient.hits %>
|
||||
<%= link_to "next", url(:controller => "search", :action => "query", :q => params[:q], :offset => [@searchclient.hits - params[:limit], params[:offset] + params[:limit]].min, :limit => params[:limit], :log_type => params[:log_type]) %>
|
||||
<% if params[:offset] + params[:limit] < @search.hits %>
|
||||
<%= link_to "next", url(:controller => "search", :action => "query", :q => params[:q], :offset => [@search.hits - params[:limit], params[:offset] + params[:limit]].min, :limit => params[:limit], :log_type => params[:log_type]) %>
|
||||
<% end %>
|
||||
|
||||
<pre>
|
||||
<%=h @searchclient.results.join("\n") %>
|
||||
<%=h @search.results.sort_by { |r| r[0] }.collect { |r| r[1] }.join("\n") %>
|
||||
</pre>
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue