mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
- s/LOGSTASH_HOME/LOGSTASH_DIR/
- Fix variable references - Add 'finished' field to SearchResponse to indicate the end of search results.
This commit is contained in:
parent
657266316d
commit
14d65e1522
6 changed files with 23 additions and 18 deletions
12
bin/agent.rb
12
bin/agent.rb
|
@ -3,6 +3,7 @@
|
||||||
require 'rubygems'
|
require 'rubygems'
|
||||||
require 'lib/net/client'
|
require 'lib/net/client'
|
||||||
require 'lib/net/messages/indexevent'
|
require 'lib/net/messages/indexevent'
|
||||||
|
require 'lib/file/tail/since'
|
||||||
require 'socket'
|
require 'socket'
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,12 +19,10 @@ class Agent < LogStash::Net::MessageClient
|
||||||
|
|
||||||
def start_log_watcher
|
def start_log_watcher
|
||||||
Thread.new do
|
Thread.new do
|
||||||
IO.popen("tail -0f /var/log/messages", "r") do |fd|
|
File::Tail::Since.new("/var/log/messages").tail do |line|
|
||||||
fd.each do |line|
|
line.chomp!
|
||||||
puts "Found line: #{line}"
|
puts "Found line: #{line}"
|
||||||
line.chomp!
|
index(line)
|
||||||
index(line)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end # def start_log_watcher
|
end # def start_log_watcher
|
||||||
|
@ -55,7 +54,6 @@ class Agent < LogStash::Net::MessageClient
|
||||||
|
|
||||||
def run
|
def run
|
||||||
loop do
|
loop do
|
||||||
puts @done
|
|
||||||
done = false
|
done = false
|
||||||
while !done
|
while !done
|
||||||
begin
|
begin
|
||||||
|
|
|
@ -43,13 +43,13 @@ module LogStash
|
||||||
end
|
end
|
||||||
|
|
||||||
def index_dir
|
def index_dir
|
||||||
return "#{home}/var/indexes/#{@attrs["log:name"]}"
|
return "#{@home}/var/indexes/#{@attrs["log:name"]}"
|
||||||
end
|
end
|
||||||
|
|
||||||
def create_index
|
def create_index
|
||||||
return if File.exists?(index_dir)
|
return if File.exists?(index_dir)
|
||||||
|
|
||||||
field_info = Ferret::Index::FieldInfos.new(:store => :no,
|
field_infos = Ferret::Index::FieldInfos.new(:store => :no,
|
||||||
:term_vector => :no)
|
:term_vector => :no)
|
||||||
field_infos.add_field(:@LINE,
|
field_infos.add_field(:@LINE,
|
||||||
:store => :compressed,
|
:store => :compressed,
|
||||||
|
|
|
@ -22,7 +22,7 @@ module LogStash
|
||||||
end
|
end
|
||||||
|
|
||||||
@grok_patterns = config.delete(:grok_patterns)
|
@grok_patterns = config.delete(:grok_patterns)
|
||||||
@home = ENV["LOGSTASH_HOME"] || "/opt/logstash"
|
@home = ENV["LOGSTASH_DIR"] || "/opt/logstash"
|
||||||
|
|
||||||
if not File.exists?("#{@home}/patterns")
|
if not File.exists?("#{@home}/patterns")
|
||||||
throw StandardError.new("#{@home}/patterns/ does not exist")
|
throw StandardError.new("#{@home}/patterns/ does not exist")
|
||||||
|
|
|
@ -30,5 +30,6 @@ module LogStash; module Net; module Messages
|
||||||
|
|
||||||
# Message attributes
|
# Message attributes
|
||||||
hashbind :results, "/args/results"
|
hashbind :results, "/args/results"
|
||||||
|
hashbind :finished, "/args/finished"
|
||||||
end # class SearchResponse
|
end # class SearchResponse
|
||||||
end; end; end # module LogStash::Net::Messages
|
end; end; end # module LogStash::Net::Messages
|
||||||
|
|
|
@ -36,7 +36,7 @@ module LogStash; module Net; module Servers
|
||||||
else
|
else
|
||||||
response.code = 0
|
response.code = 0
|
||||||
if not @indexes.member?(log_type)
|
if not @indexes.member?(log_type)
|
||||||
@indexes[log_type] = @logs[log_type].get_index
|
@indexes[log_type] = $logs[log_type].get_index
|
||||||
end
|
end
|
||||||
|
|
||||||
entry["@LOG_TYPE"] = log_type
|
entry["@LOG_TYPE"] = log_type
|
||||||
|
@ -53,19 +53,16 @@ module LogStash; module Net; module Servers
|
||||||
end
|
end
|
||||||
|
|
||||||
def SearchRequestHandler(request)
|
def SearchRequestHandler(request)
|
||||||
response = LogStash::Net::Messages::SearchResponse.new
|
|
||||||
response.id = request.id
|
|
||||||
puts "Search for #{request.query.inspect}"
|
puts "Search for #{request.query.inspect}"
|
||||||
|
|
||||||
reader = Ferret::Index::IndexReader.new($logs[request.log_type].index_dir)
|
reader = Ferret::Index::IndexReader.new($logs[request.log_type].index_dir)
|
||||||
search = Ferret::Search::Searcher.new(reader)
|
search = Ferret::Search::Searcher.new(reader)
|
||||||
|
|
||||||
#puts reader.fields.join("\n")
|
puts reader.fields.join("\n")
|
||||||
qp = Ferret::QueryParser.new(:fields => reader.fields,
|
qp = Ferret::QueryParser.new(:fields => reader.fields,
|
||||||
:tokenized_fields => reader.tokenized_fields,
|
:tokenized_fields => reader.tokenized_fields,
|
||||||
:or_default => false)
|
:or_default => false)
|
||||||
query = qp.parse(request.query)
|
query = qp.parse(request.query)
|
||||||
response.results = []
|
|
||||||
search.search_each(query, :limit => :all,
|
search.search_each(query, :limit => :all,
|
||||||
:sort => "@DATE") do |docid, score|
|
:sort => "@DATE") do |docid, score|
|
||||||
result = reader[docid][:@LINE]
|
result = reader[docid][:@LINE]
|
||||||
|
@ -74,6 +71,11 @@ module LogStash; module Net; module Servers
|
||||||
response.results = [result]
|
response.results = [result]
|
||||||
yield response
|
yield response
|
||||||
end
|
end
|
||||||
|
response = LogStash::Net::Messages::SearchResponse.new
|
||||||
|
response.id = request.id
|
||||||
|
response.results = []
|
||||||
|
response.finished = true
|
||||||
|
yield response
|
||||||
end
|
end
|
||||||
|
|
||||||
# Special 'run' override because we want sync to disk once per minute.
|
# Special 'run' override because we want sync to disk once per minute.
|
||||||
|
|
|
@ -31,6 +31,9 @@ class Client < LogStash::Net::MessageSocketMux
|
||||||
msg.results.each do |result|
|
msg.results.each do |result|
|
||||||
puts result
|
puts result
|
||||||
end
|
end
|
||||||
|
if msg.finished
|
||||||
|
$done = true
|
||||||
|
end
|
||||||
#gotresponse(msg)
|
#gotresponse(msg)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -47,7 +50,8 @@ msg = LogStash::Net::Messages::SearchRequest.new
|
||||||
msg.log_type = ARGV[0]
|
msg.log_type = ARGV[0]
|
||||||
msg.query = ARGV[1]
|
msg.query = ARGV[1]
|
||||||
$me.sendmsg(msg)
|
$me.sendmsg(msg)
|
||||||
$done = true
|
|
||||||
$me.close()
|
$me.close()
|
||||||
$done = true
|
|
||||||
$me.run
|
while !$done
|
||||||
|
$me.sendrecv(10)
|
||||||
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue