#7111 cleaned up one case of redundant CRC32 digest and byte[] creation. Adjusted benchmark checkpoint interval to default value to be able to judge performance impact of the move.

Fixes #7166
This commit is contained in:
Armin 2017-05-19 16:34:06 +02:00 committed by Armin Braun
parent 1ea449f5a8
commit c80eabbc9d
2 changed files with 33 additions and 23 deletions

View file

@ -87,8 +87,8 @@ public class QueueBenchmark {
.capacity(256 * 1024 * 1024) .capacity(256 * 1024 * 1024)
.queueMaxBytes(Long.MAX_VALUE) .queueMaxBytes(Long.MAX_VALUE)
.elementIOFactory(MmapPageIO::new) .elementIOFactory(MmapPageIO::new)
.checkpointMaxWrites(50_000) .checkpointMaxWrites(1024)
.checkpointMaxAcks(50_000) .checkpointMaxAcks(1024)
.checkpointIOFactory(FileCheckpointIO::new) .checkpointIOFactory(FileCheckpointIO::new)
.elementClass(Event.class).build(); .elementClass(Event.class).build();
} }

View file

@ -1,21 +1,18 @@
package org.logstash.ackedqueue.io; package org.logstash.ackedqueue.io;
import java.nio.channels.FileChannel;
import org.logstash.ackedqueue.Checkpoint; import org.logstash.ackedqueue.Checkpoint;
import org.logstash.common.io.BufferedChecksumStreamInput; import org.logstash.common.io.BufferedChecksumStreamInput;
import org.logstash.common.io.BufferedChecksumStreamOutput;
import org.logstash.common.io.ByteArrayStreamOutput;
import org.logstash.common.io.InputStreamStreamInput; import org.logstash.common.io.InputStreamStreamInput;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import static java.nio.file.StandardOpenOption.CREATE; import java.util.zip.CRC32;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.DSYNC;
public class FileCheckpointIO implements CheckpointIO { public class FileCheckpointIO implements CheckpointIO {
// Checkpoint file structure // Checkpoint file structure
@ -35,9 +32,16 @@ public class FileCheckpointIO implements CheckpointIO {
+ Integer.BYTES // eventCount + Integer.BYTES // eventCount
+ Integer.BYTES; // checksum + Integer.BYTES; // checksum
/**
* Using {@link java.nio.DirectByteBuffer} to avoid allocations and copying in
* {@link FileChannel#write(ByteBuffer)} and {@link CRC32#update(ByteBuffer)} calls.
*/
private final ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
private final CRC32 crc32 = new CRC32();
private static final String HEAD_CHECKPOINT = "checkpoint.head"; private static final String HEAD_CHECKPOINT = "checkpoint.head";
private static final String TAIL_CHECKPOINT = "checkpoint."; private static final String TAIL_CHECKPOINT = "checkpoint.";
private static final OpenOption[] WRITE_OPTIONS = { WRITE, CREATE, TRUNCATE_EXISTING, DSYNC };
private final String dirPath; private final String dirPath;
public FileCheckpointIO(String dirPath) { public FileCheckpointIO(String dirPath) {
@ -64,10 +68,12 @@ public class FileCheckpointIO implements CheckpointIO {
@Override @Override
public void write(String fileName, Checkpoint checkpoint) throws IOException { public void write(String fileName, Checkpoint checkpoint) throws IOException {
Path path = Paths.get(dirPath, fileName);
final byte[] buffer = new byte[BUFFER_SIZE];
write(checkpoint, buffer); write(checkpoint, buffer);
Files.write(path, buffer, WRITE_OPTIONS); buffer.flip();
try (FileOutputStream out = new FileOutputStream(Paths.get(dirPath, fileName).toFile())) {
out.getChannel().write(buffer);
out.getFD().sync();
}
} }
@Override @Override
@ -115,14 +121,18 @@ public class FileCheckpointIO implements CheckpointIO {
return new Checkpoint(pageNum, firstUnackedPageNum, firstUnackedSeqNum, minSeqNum, elementCount); return new Checkpoint(pageNum, firstUnackedPageNum, firstUnackedSeqNum, minSeqNum, elementCount);
} }
private void write(Checkpoint checkpoint, byte[] buf) throws IOException { private void write(Checkpoint checkpoint, ByteBuffer buf) {
BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(new ByteArrayStreamOutput(buf)); crc32.reset();
output.writeShort((short)Checkpoint.VERSION); buf.clear();
output.writeInt(checkpoint.getPageNum()); buf.putShort((short)Checkpoint.VERSION);
output.writeInt(checkpoint.getFirstUnackedPageNum()); buf.putInt(checkpoint.getPageNum());
output.writeLong(checkpoint.getFirstUnackedSeqNum()); buf.putInt(checkpoint.getFirstUnackedPageNum());
output.writeLong(checkpoint.getMinSeqNum()); buf.putLong(checkpoint.getFirstUnackedSeqNum());
output.writeInt(checkpoint.getElementCount()); buf.putLong(checkpoint.getMinSeqNum());
output.writeInt((int)output.getChecksum()); buf.putInt(checkpoint.getElementCount());
buf.flip();
crc32.update(buf);
buf.position(BUFFER_SIZE - Integer.BYTES).limit(BUFFER_SIZE);
buf.putInt((int)crc32.getValue());
} }
} }