MINOR: Cleanup dead code in PQ, fix unsafe resource handling in PQ pages

Fixes #9332
This commit is contained in:
Armin 2018-04-06 11:42:35 +02:00 committed by Armin Braun
parent 8c22f6d045
commit e07f19e9cf
8 changed files with 32 additions and 90 deletions

View file

@ -105,12 +105,6 @@ describe LogStash::WrappedAckedQueue, :stress_test => true do
output_strings.concat files output_strings.concat files
end end
begin
queue.queue.open
rescue Exception => e
output_strings << e.message
end
queue.close queue.close
if output_strings.any? if output_strings.any?

View file

@ -7,7 +7,6 @@ import org.logstash.Event;
import org.logstash.ext.JrubyEventExtLibrary; import org.logstash.ext.JrubyEventExtLibrary;
public final class AckedBatch { public final class AckedBatch {
private static final long serialVersionUID = -3118949118637372130L;
private Batch batch; private Batch batch;
public static AckedBatch create(Batch batch) { public static AckedBatch create(Batch batch) {

View file

@ -27,6 +27,7 @@ public class Batch implements Closeable {
} }
// close acks the batch ackable events // close acks the batch ackable events
@Override
public void close() throws IOException { public void close() throws IOException {
if (closed.getAndSet(true) == false) { if (closed.getAndSet(true) == false) {
this.queue.ack(this.seqNums); this.queue.ack(this.seqNums);

View file

@ -9,7 +9,6 @@ import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod; import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaObject; import org.jruby.javasupport.JavaObject;
import org.jruby.runtime.Arity;
import org.jruby.runtime.ThreadContext; import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject; import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.Event; import org.logstash.Event;
@ -18,7 +17,6 @@ import org.logstash.ackedqueue.AckedBatch;
import org.logstash.ackedqueue.Batch; import org.logstash.ackedqueue.Batch;
import org.logstash.ackedqueue.Queue; import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.SettingsImpl; import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ext.JrubyEventExtLibrary;
@JRubyClass(name = "AckedQueue") @JRubyClass(name = "AckedQueue")
public final class JRubyAckedQueueExt extends RubyObject { public final class JRubyAckedQueueExt extends RubyObject {
@ -41,19 +39,6 @@ public final class JRubyAckedQueueExt extends RubyObject {
return queueExt; return queueExt;
} }
@JRubyMethod(name = "initialize", optional = 7)
public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args) {
args = Arity.scanArgs(context.runtime, args, 7, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxUnread = RubyFixnum.num2int(args[2]);
int checkpointMaxAcks = RubyFixnum.num2int(args[3]);
int checkpointMaxWrites = RubyFixnum.num2int(args[4]);
long queueMaxBytes = RubyFixnum.num2long(args[6]);
initializeQueue(args[0].asJavaString(), capacity, maxUnread, checkpointMaxWrites, checkpointMaxAcks, queueMaxBytes);
return context.nil;
}
private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites, int checkpointMaxAcks, long maxBytes) { private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites, int checkpointMaxAcks, long maxBytes) {
this.queue = new Queue( this.queue = new Queue(
SettingsImpl.fileSettingsBuilder(path) SettingsImpl.fileSettingsBuilder(path)
@ -107,33 +92,16 @@ public final class JRubyAckedQueueExt extends RubyObject {
return context.runtime.newFixnum(queue.getUnreadCount()); return context.runtime.newFixnum(queue.getUnreadCount());
} }
@JRubyMethod(name = "open")
public IRubyObject ruby_open(ThreadContext context) {
try {
open();
} catch (IOException e) {
throw RubyUtil.newRubyIOError(context.runtime, e);
}
return context.nil;
}
public void open() throws IOException { public void open() throws IOException {
queue.open(); queue.open();
} }
@JRubyMethod(name = {"write", "<<"}, required = 1) public void rubyWrite(ThreadContext context, Event event) {
public IRubyObject ruby_write(ThreadContext context, IRubyObject event) {
if (!(event instanceof JrubyEventExtLibrary.RubyEvent)) {
throw context.runtime.newTypeError(
"wrong argument type " + event.getMetaClass() + " (expected LogStash::Event)");
}
long seqNum;
try { try {
seqNum = this.queue.write(((JrubyEventExtLibrary.RubyEvent) event).getEvent()); this.queue.write(event);
} catch (IOException e) { } catch (IOException e) {
throw RubyUtil.newRubyIOError(context.runtime, e); throw RubyUtil.newRubyIOError(context.runtime, e);
} }
return context.runtime.newFixnum(seqNum);
} }
@JRubyMethod(name = "read_batch", required = 2) @JRubyMethod(name = "read_batch", required = 2)
@ -159,11 +127,6 @@ public final class JRubyAckedQueueExt extends RubyObject {
return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked()); return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked());
} }
@JRubyMethod(name = "is_empty?")
public IRubyObject ruby_is_empty(ThreadContext context) {
return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
}
public boolean isEmpty() { public boolean isEmpty() {
return queue.isEmpty(); return queue.isEmpty();
} }

View file

@ -3,6 +3,7 @@ package org.logstash.ackedqueue.ext;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.jruby.Ruby; import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass; import org.jruby.RubyClass;
import org.jruby.RubyFixnum; import org.jruby.RubyFixnum;
import org.jruby.RubyObject; import org.jruby.RubyObject;
@ -14,6 +15,7 @@ import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil; import org.logstash.RubyUtil;
import org.logstash.ext.JrubyAckedReadClientExt; import org.logstash.ext.JrubyAckedReadClientExt;
import org.logstash.ext.JrubyAckedWriteClientExt; import org.logstash.ext.JrubyAckedWriteClientExt;
import org.logstash.ext.JrubyEventExtLibrary;
@JRubyClass(name = "WrappedAckedQueue") @JRubyClass(name = "WrappedAckedQueue")
public final class JRubyWrappedAckedQueueExt extends RubyObject { public final class JRubyWrappedAckedQueueExt extends RubyObject {
@ -62,9 +64,9 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
} }
@JRubyMethod(name = {"push", "<<"}) @JRubyMethod(name = {"push", "<<"})
public void rubyPush(ThreadContext context, IRubyObject object) { public void rubyPush(ThreadContext context, IRubyObject event) {
checkIfClosed("write"); checkIfClosed("write");
queue.ruby_write(context, object); queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent());
} }
@JRubyMethod(name = "read_batch") @JRubyMethod(name = "read_batch")
@ -86,7 +88,7 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
@JRubyMethod(name = "is_empty?") @JRubyMethod(name = "is_empty?")
public IRubyObject rubyIsEmpty(ThreadContext context) { public IRubyObject rubyIsEmpty(ThreadContext context) {
return queue.ruby_is_empty(context); return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
} }
private void checkIfClosed(String action) { private void checkIfClosed(String action) {

View file

@ -24,8 +24,6 @@ public final class MmapPageIO implements PageIO {
public static final int SEQNUM_SIZE = Long.BYTES; public static final int SEQNUM_SIZE = Long.BYTES;
public static final int MIN_CAPACITY = VERSION_SIZE + SEQNUM_SIZE + LENGTH_SIZE + 1 + CHECKSUM_SIZE; // header overhead plus elements overhead to hold a single 1 byte element public static final int MIN_CAPACITY = VERSION_SIZE + SEQNUM_SIZE + LENGTH_SIZE + 1 + CHECKSUM_SIZE; // header overhead plus elements overhead to hold a single 1 byte element
public static final int HEADER_SIZE = 1; // version byte public static final int HEADER_SIZE = 1; // version byte
// Size of: Header + Sequence Number + Length + Checksum
public static final int WRAPPER_SIZE = HEADER_SIZE + SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE;
public static final boolean VERIFY_CHECKSUM = true; public static final boolean VERIFY_CHECKSUM = true;
private static final Logger LOGGER = LogManager.getLogger(MmapPageIO.class); private static final Logger LOGGER = LogManager.getLogger(MmapPageIO.class);
@ -42,8 +40,6 @@ public final class MmapPageIO implements PageIO {
private final IntVector offsetMap; private final IntVector offsetMap;
private FileChannel channel;
private int capacity; // page capacity is an int per the ByteBuffer class. private int capacity; // page capacity is an int per the ByteBuffer class.
private long minSeqNum; // TODO: to make minSeqNum final we have to pass in the minSeqNum in the constructor and not set it on first write private long minSeqNum; // TODO: to make minSeqNum final we have to pass in the minSeqNum in the constructor and not set it on first write
private int elementCount; private int elementCount;
@ -169,10 +165,9 @@ public final class MmapPageIO implements PageIO {
@Override @Override
public void create() throws IOException { public void create() throws IOException {
RandomAccessFile raf = new RandomAccessFile(this.file, "rw"); try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) {
this.channel = raf.getChannel(); this.buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
this.buffer = this.channel.map(FileChannel.MapMode.READ_WRITE, 0, this.capacity); }
raf.close();
buffer.position(0); buffer.position(0);
buffer.put(VERSION_ONE); buffer.put(VERSION_ONE);
this.head = 1; this.head = 1;
@ -181,17 +176,16 @@ public final class MmapPageIO implements PageIO {
} }
@Override @Override
public void deactivate() throws IOException { public void deactivate() {
close(); // close can be called multiple times close(); // close can be called multiple times
} }
@Override @Override
public void activate() throws IOException { public void activate() throws IOException {
if (this.channel == null) { if (this.buffer == null) {
RandomAccessFile raf = new RandomAccessFile(this.file, "rw"); try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) {
this.channel = raf.getChannel(); this.buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
this.buffer = this.channel.map(FileChannel.MapMode.READ_WRITE, 0, this.capacity); }
raf.close();
this.buffer.load(); this.buffer.load();
} }
// TODO: do we need to check is the channel is still open? not sure how it could be closed // TODO: do we need to check is the channel is still open? not sure how it could be closed
@ -215,19 +209,12 @@ public final class MmapPageIO implements PageIO {
} }
@Override @Override
public void close() throws IOException { public void close() {
if (this.buffer != null) { if (this.buffer != null) {
this.buffer.force(); this.buffer.force();
BUFFER_CLEANER.clean(buffer); BUFFER_CLEANER.clean(buffer);
} }
if (this.channel != null) {
if (this.channel.isOpen()) {
this.channel.force(false);
}
this.channel.close(); // close can be called multiple times
}
this.channel = null;
this.buffer = null; this.buffer = null;
} }
@ -274,23 +261,21 @@ public final class MmapPageIO implements PageIO {
// memory map data file to this.buffer and read initial version byte // memory map data file to this.buffer and read initial version byte
private void mapFile() throws IOException { private void mapFile() throws IOException {
RandomAccessFile raf = new RandomAccessFile(this.file, "rw"); try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) {
if (raf.length() > Integer.MAX_VALUE) { if (raf.length() > Integer.MAX_VALUE) {
throw new IOException("Page file too large " + this.file); throw new IOException("Page file too large " + this.file);
}
int pageFileCapacity = (int) raf.length();
// update capacity to actual raf length. this can happen if a page size was changed on a non empty queue directory for example.
this.capacity = pageFileCapacity;
if (this.capacity < MIN_CAPACITY) {
throw new IOException(String.format("Page file size is too small to hold elements"));
}
this.buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
} }
int pageFileCapacity = (int) raf.length();
// update capacity to actual raf length. this can happen if a page size was changed on a non empty queue directory for example.
this.capacity = pageFileCapacity;
if (this.capacity < MIN_CAPACITY) {
throw new IOException(String.format("Page file size is too small to hold elements"));
}
this.channel = raf.getChannel();
this.buffer = this.channel.map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
raf.close();
this.buffer.load(); this.buffer.load();
} }

View file

@ -182,6 +182,4 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
} }
public abstract void close() throws IOException; public abstract void close() throws IOException;
public abstract boolean isEmpty();
} }

View file

@ -50,7 +50,7 @@ public final class JrubyAckedWriteClientExt extends RubyObject {
@JRubyMethod(name = {"push", "<<"}, required = 1) @JRubyMethod(name = {"push", "<<"}, required = 1)
public IRubyObject rubyPush(final ThreadContext context, IRubyObject event) { public IRubyObject rubyPush(final ThreadContext context, IRubyObject event) {
ensureOpen(); ensureOpen();
queue.ruby_write(context, event); queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent());
return this; return this;
} }
@ -58,7 +58,7 @@ public final class JrubyAckedWriteClientExt extends RubyObject {
public IRubyObject rubyPushBatch(final ThreadContext context, IRubyObject batch) { public IRubyObject rubyPushBatch(final ThreadContext context, IRubyObject batch) {
ensureOpen(); ensureOpen();
for (final IRubyObject event : (Collection<JrubyEventExtLibrary.RubyEvent>) batch) { for (final IRubyObject event : (Collection<JrubyEventExtLibrary.RubyEvent>) batch) {
queue.ruby_write(context, event); queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent());
} }
return this; return this;
} }