Deps: un-pin (and avoid) rufus-scheduler (#14260)

+ Refactor: specific require + scope java_import
+ Refactor: redundant requires
+ Refactor: avoid rufus - hook up a timer task
This commit is contained in:
Karol Bucek 2022-06-21 12:26:03 +02:00 committed by GitHub
parent ce27e08eac
commit 989f9e7937
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 21 deletions

View file

@ -77,11 +77,13 @@ Gem::Specification.new do |gem|
gem.add_development_dependency 'logstash-filter-geoip', '>= 7.2.1' # breaking change of DatabaseManager gem.add_development_dependency 'logstash-filter-geoip', '>= 7.2.1' # breaking change of DatabaseManager
gem.add_dependency 'down', '~> 5.2.0' #(MIT license) gem.add_dependency 'down', '~> 5.2.0' #(MIT license)
gem.add_dependency 'tzinfo-data' #(MIT license) gem.add_dependency 'tzinfo-data' #(MIT license)
# TEMPORARY: Modern Rufus Scheduler 3.x subtly breaks thread joining, which
# is done in several plugins to handle shutdowns. # NOTE: plugins now avoid using **rufus-scheduler** directly, if logstash-core would find itself in a need
# Pin pending migration to shared Scheduler Mixin that can mitigate this issue. # to use rufus than preferably the **logstash-mixin-scheduler** should be changed to work with non-plugins.
# https://github.com/logstash-plugins/logstash-mixin-scheduler/pull/1 #
gem.add_runtime_dependency 'rufus-scheduler', '~> 3.0.9' #(MIT license) # Using the scheduler directly might lead to issues e.g. when join-ing, see:
# https://github.com/logstash-plugins/logstash-mixin-scheduler/blob/v1.0.1/lib/logstash/plugin_mixins/scheduler/rufus_impl.rb#L85=
# and https://github.com/elastic/logstash/issues/13773
# TEMPORARY: racc-1.6.0 doesn't have JAVA counterpart (yet) # TEMPORARY: racc-1.6.0 doesn't have JAVA counterpart (yet)
# SEE: https://github.com/ruby/racc/issues/172 # SEE: https://github.com/ruby/racc/issues/172

View file

@ -7,16 +7,12 @@ require_relative "util"
require_relative "database_metadata" require_relative "database_metadata"
require_relative "download_manager" require_relative "download_manager"
require_relative "database_metric" require_relative "database_metric"
require "faraday"
require "json" require "json"
require "zlib"
require "stud/try" require "stud/try"
require "down"
require "rufus/scheduler"
require "singleton" require "singleton"
require "concurrent" require "concurrent/array"
require "concurrent/timer_task"
require "thread" require "thread"
java_import org.apache.logging.log4j.ThreadContext
# The mission of DatabaseManager is to ensure the plugin running an up-to-date MaxMind database and # The mission of DatabaseManager is to ensure the plugin running an up-to-date MaxMind database and
# thus users are compliant with EULA. # thus users are compliant with EULA.
@ -35,10 +31,13 @@ module LogStash module Filters module Geoip class DatabaseManager
include LogStash::Filters::Geoip::Util include LogStash::Filters::Geoip::Util
include Singleton include Singleton
java_import org.apache.logging.log4j.ThreadContext
private private
def initialize def initialize
@triggered = false @triggered = false
@trigger_lock = Mutex.new @trigger_lock = Mutex.new
@download_interval = 24 * 60 * 60 # 24h
end end
def setup def setup
@ -205,21 +204,25 @@ module LogStash module Filters module Geoip class DatabaseManager
return if @triggered return if @triggered
setup setup
execute_download_job execute_download_job
# check database update periodically. trigger `call` method # check database update periodically:
@scheduler = Rufus::Scheduler.new({:max_work_threads => 1})
@scheduler.every('24h', self) @download_task = Concurrent::TimerTask.execute(execution_interval: @download_interval) do
LogStash::Util.set_thread_name 'geoip database download task'
database_update_check # every 24h
end
@triggered = true @triggered = true
end end
end end
public public
# scheduler callback # @note this method is expected to execute on a separate thread
def call(job, time) def database_update_check
logger.debug "scheduler runs database update check" logger.debug "running database update check"
ThreadContext.put("pipeline.id", nil) ThreadContext.put("pipeline.id", nil)
execute_download_job execute_download_job
end end
private :database_update_check
def subscribe_database_path(database_type, database_path, geoip_plugin) def subscribe_database_path(database_type, database_path, geoip_plugin)
if database_path.nil? if database_path.nil?

View file

@ -12,7 +12,6 @@ describe LogStash::Filters::Geoip do
let(:mock_geoip_plugin) { double("geoip_plugin") } let(:mock_geoip_plugin) { double("geoip_plugin") }
let(:mock_metadata) { double("database_metadata") } let(:mock_metadata) { double("database_metadata") }
let(:mock_download_manager) { double("download_manager") } let(:mock_download_manager) { double("download_manager") }
let(:mock_scheduler) { double("scheduler") }
let(:agent_metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) } let(:agent_metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) }
let(:database_metric) { LogStash::Filters::Geoip::DatabaseMetric.new(agent_metric) } let(:database_metric) { LogStash::Filters::Geoip::DatabaseMetric.new(agent_metric) }
let(:db_manager) do let(:db_manager) do
@ -21,7 +20,6 @@ describe LogStash::Filters::Geoip do
manager.send(:setup) manager.send(:setup)
manager.instance_variable_set(:@metadata, mock_metadata) manager.instance_variable_set(:@metadata, mock_metadata)
manager.instance_variable_set(:@download_manager, mock_download_manager) manager.instance_variable_set(:@download_manager, mock_download_manager)
manager.instance_variable_set(:@scheduler, mock_scheduler)
manager manager
end end
let(:logger) { double("Logger") } let(:logger) { double("Logger") }
@ -215,6 +213,27 @@ describe LogStash::Filters::Geoip do
end end
end end
context "periodic database update" do
it 'sets up periodic task when download triggered' do
db_manager.send :trigger_download
download_task = db_manager.instance_variable_get(:@download_task)
expect( download_task ).to_not be nil
expect( download_task.running? ).to be true
expect( download_task.execution_interval ).to eq 86_400
end
it 'executes download job after interval passes' do
db_manager.instance_variable_set(:@download_interval, 1.5)
db_manager.send :trigger_download
download_task = db_manager.instance_variable_get(:@download_task)
expect( download_task.running? ).to be true
expect( db_manager ).to receive :execute_download_job
sleep 2.0 # wait for task execution
end
end
context "check age" do context "check age" do
context "eula database" do context "eula database" do
let(:db_manager) do let(:db_manager) do
@ -362,7 +381,7 @@ describe LogStash::Filters::Geoip do
end end
context "shutdown" do context "shutdown" do
let(:db_manager) { manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance } let(:db_manager) { Class.new(LogStash::Filters::Geoip::DatabaseManager).instance }
it "should unsubscribe gracefully" do it "should unsubscribe gracefully" do
db_manager.subscribe_database_path(CITY, default_city_db_path, mock_geoip_plugin) db_manager.subscribe_database_path(CITY, default_city_db_path, mock_geoip_plugin)
@ -371,7 +390,7 @@ describe LogStash::Filters::Geoip do
end end
context "database metric is not assigned" do context "database metric is not assigned" do
let(:db_manager) { manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance } let(:db_manager) { Class.new(LogStash::Filters::Geoip::DatabaseManager).instance }
it "does not throw error" do it "does not throw error" do
allow(LogStash::Filters::Geoip::DatabaseManager).to receive(:logger).and_return(logger) allow(LogStash::Filters::Geoip::DatabaseManager).to receive(:logger).and_return(logger)