- Use elasticsearch again for a indexing backend.

- Update logstash web to hit elasticsearch (hardcoded host for now)
This commit is contained in:
Jordan Sissel 2010-04-25 09:12:45 +00:00
parent 32ca7fbaea
commit 1cb56a94d4
5 changed files with 99 additions and 22 deletions

View file

@ -7,8 +7,10 @@ require 'ap'
require 'socket' # for Socket.gethostname
require 'eventmachine'
require 'eventmachine-tail'
require 'em-http'
PROGRESS_AMOUNT = 500
AMOUNT = 500
class GrokReader < EventMachine::FileTail
def initialize(path, agent)
super(path)
@ -80,25 +82,38 @@ module LogStash; module Programs;
private
def index(name, entry)
logstash_index(name, entry)
#logstash_index(name, entry)
elastic_index(name, entry)
end
def logstash_index(name, entry)
@index.index(entry)
@count += 1
if @count % AMOUNT == 0
if @count % PROGRESS_AMOUNT == 0
#flush_indexes
#puts "match #{name} in #{path}: #{line}"
puts "count: #{@count} #{AMOUNT / (Time.now - @start)}"
puts "count: #{@count} #{PROGRESS_AMOUNT / (Time.now - @start)}"
@start = Time.now
end
end
def elastic_index(name, entry)
http = EventMachine::HttpRequest.new("http://localhost:9200/logstash/#{name}")
req = http.post :body => entry.to_json
@count += 1
puts "count: #{@count} #{PROGRESS_AMOUNT / (Time.now - @start)}"
req.callback do
if @count % PROGRESS_AMOUNT == 0
@start = Time.now
end
end
end
def ferret_index(name, entry)
@indexes[name] << entry
@needs_flushing << name
@count += 1
if @count % AMOUNT == 0
if @count % PROGRESS_AMOUNT == 0
#flush_indexes
#puts "match #{name} in #{path}: #{line}"
puts "count: #{@count} #{AMOUNT / (Time.now - @start)}"

View file

@ -1,32 +1,41 @@
$: << ".."
require "lib/net/clients/search"
#$: << ".."
#require "lib/net/clients/search"
require "timeout"
require "elasticsearch"
class Search < Application
def index
render
end
def query
q = {}
params[:log_type] = (params[:log_type] or "linux-syslog")
params[:offset] = (params[:offset] ? params[:offset].to_i : 0) rescue 0
params[:limit] = (params[:limit] ? params[:limit].to_i : 100) rescue 100
params[:log_type] = (params[:log_type] or "linux-syslog")
params[:query] = params[:q]
q[:from] = params[:offset]
q[:size] = params[:limit]
q[:log_type] = params[:log_type]
q[:base] = "logstash"
q[:q] = params[:q]
search = ElasticSearch.new("localhost:9200")
Timeout.timeout(10) do
@hits, @results = $search.search(params)
@graphdata = _graphpoints
#@hits, @results = $search.search(params)
results = search.query(q)
@hits = results.hits
@results = results.results
@graphdata = _graphpoints(search, q)
render
end
end
def _graphpoints
def _graphpoints(search, query)
#provides :json
params[:log_type] = (params[:log_type] or "linux-syslog")
orig_query = params[:q]
orig_query = query[:q]
day = 60 * 60 * 24
hour = 60 * 60
@ -50,11 +59,11 @@ class Search < Application
curtime += increment
end
hits = $search.searchhits(params[:log_type], queries.keys)
#puts queries.inspect
hits.each do |key,count|
#puts "query: #{queries.has_key?(key)} / #{key} "
queries[key][:hits] = count
queries.each do |genquery, data|
hitq = query.clone
hitq[:q] = genquery
count = search.count(hitq)
queries[genquery][:hits] = count
end
@data = Hash.new

View file

@ -39,7 +39,7 @@
<% end %>
<pre>
<%=h @results.sort_by { |r| r[0] == nil ? 0 : r[0] }.collect { |r| r[1] }.join("\n") %>
<%=h @results.collect { |v| v["_source"]["@LINE"] }.join("\n") %>
</pre>
<script id="source">

View file

@ -24,5 +24,5 @@ end
Merb::BootLoader.after_app_loads do
# This will get executed after your app's classes have been loaded.
$search = LogStash::Net::Clients::Search.new("/opt/logstash/etc/logstashd.yaml")
#$search = LogStash::Net::Clients::Search.new("/opt/logstash/etc/logstashd.yaml")
end

53
web/lib/elasticsearch.rb Normal file
View file

@ -0,0 +1,53 @@
require "rubygems"
require "uri"
require "json"
require "logger"
require "httpclient"
class ElasticSearch
def initialize(host)
@host = host
@http = HTTPClient.new
@logger = Logger.new(STDERR)
end
def _get(query, what)
index = URI.escape("#{query[:base]}/#{query[:log_type]}")
uri = "http://#{@host}/#{index}/_#{what}?"
params = query.collect { |k,v| "#{URI.escape(k.to_s)}=#{URI.escape(v.to_s)}" }.join("&")
uri += "#{params}"
@logger.info("URL for #{what}: #{uri}")
response = @http.get(uri)
if response.status != 200
p JSON.parse(response.content)
raise "Search failure (http code #{response.code})"
end
return JSON.parse(response.content)
end
def query(query)
return ElasticSearch::SearchResults.new(_get(query, "search"))
end # def query
def count( query)
return _get(query, "count")["count"]
end
end
class ElasticSearch::SearchResults
attr_reader :hits
attr_reader :results
def initialize(data)
@hits = data["hits"]["total"]
@results = data["hits"]["hits"]
end
end
if __FILE__ == $0
require "ap"
es = ElasticSearch.new("localhost:9200")
ap es.query(:base => "logstash", :log_type => "linux-syslog", :q => "progname:etl-cron").results
end