From c80eabbc9d1e17bb9f6b0b6d717ac7a0cc82c08d Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 19 May 2017 16:34:06 +0200 Subject: [PATCH] #7111 cleaned up one case of redundant CRC32 digest and byte[] creation. Adjusted benchmark checkpoint interval to default value to be able to judge performance impact of the move. Fixes #7166 --- .../logstash/benchmark/QueueBenchmark.java | 4 +- .../ackedqueue/io/FileCheckpointIO.java | 52 +++++++++++-------- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java index 112e9f47d..7489258bf 100644 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/QueueBenchmark.java @@ -87,8 +87,8 @@ public class QueueBenchmark { .capacity(256 * 1024 * 1024) .queueMaxBytes(Long.MAX_VALUE) .elementIOFactory(MmapPageIO::new) - .checkpointMaxWrites(50_000) - .checkpointMaxAcks(50_000) + .checkpointMaxWrites(1024) + .checkpointMaxAcks(1024) .checkpointIOFactory(FileCheckpointIO::new) .elementClass(Event.class).build(); } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/io/FileCheckpointIO.java b/logstash-core/src/main/java/org/logstash/ackedqueue/io/FileCheckpointIO.java index 700e206a0..0c7d91f2a 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/io/FileCheckpointIO.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/io/FileCheckpointIO.java @@ -1,23 +1,20 @@ package org.logstash.ackedqueue.io; +import java.nio.channels.FileChannel; import org.logstash.ackedqueue.Checkpoint; import org.logstash.common.io.BufferedChecksumStreamInput; -import org.logstash.common.io.BufferedChecksumStreamOutput; -import org.logstash.common.io.ByteArrayStreamOutput; import org.logstash.common.io.InputStreamStreamInput; import java.io.ByteArrayInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; -import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.Paths; -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.WRITE; -import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; -import static java.nio.file.StandardOpenOption.DSYNC; +import java.util.zip.CRC32; -public class FileCheckpointIO implements CheckpointIO { +public class FileCheckpointIO implements CheckpointIO { // Checkpoint file structure // // byte version; @@ -35,9 +32,16 @@ public class FileCheckpointIO implements CheckpointIO { + Integer.BYTES // eventCount + Integer.BYTES; // checksum + /** + * Using {@link java.nio.DirectByteBuffer} to avoid allocations and copying in + * {@link FileChannel#write(ByteBuffer)} and {@link CRC32#update(ByteBuffer)} calls. + */ + private final ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE); + + private final CRC32 crc32 = new CRC32(); + private static final String HEAD_CHECKPOINT = "checkpoint.head"; private static final String TAIL_CHECKPOINT = "checkpoint."; - private static final OpenOption[] WRITE_OPTIONS = { WRITE, CREATE, TRUNCATE_EXISTING, DSYNC }; private final String dirPath; public FileCheckpointIO(String dirPath) { @@ -64,10 +68,12 @@ public class FileCheckpointIO implements CheckpointIO { @Override public void write(String fileName, Checkpoint checkpoint) throws IOException { - Path path = Paths.get(dirPath, fileName); - final byte[] buffer = new byte[BUFFER_SIZE]; write(checkpoint, buffer); - Files.write(path, buffer, WRITE_OPTIONS); + buffer.flip(); + try (FileOutputStream out = new FileOutputStream(Paths.get(dirPath, fileName).toFile())) { + out.getChannel().write(buffer); + out.getFD().sync(); + } } @Override @@ -115,14 +121,18 @@ public class FileCheckpointIO implements CheckpointIO { return new Checkpoint(pageNum, firstUnackedPageNum, firstUnackedSeqNum, minSeqNum, elementCount); } - private void write(Checkpoint checkpoint, byte[] buf) throws IOException { - BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(new ByteArrayStreamOutput(buf)); - output.writeShort((short)Checkpoint.VERSION); - output.writeInt(checkpoint.getPageNum()); - output.writeInt(checkpoint.getFirstUnackedPageNum()); - output.writeLong(checkpoint.getFirstUnackedSeqNum()); - output.writeLong(checkpoint.getMinSeqNum()); - output.writeInt(checkpoint.getElementCount()); - output.writeInt((int)output.getChecksum()); + private void write(Checkpoint checkpoint, ByteBuffer buf) { + crc32.reset(); + buf.clear(); + buf.putShort((short)Checkpoint.VERSION); + buf.putInt(checkpoint.getPageNum()); + buf.putInt(checkpoint.getFirstUnackedPageNum()); + buf.putLong(checkpoint.getFirstUnackedSeqNum()); + buf.putLong(checkpoint.getMinSeqNum()); + buf.putInt(checkpoint.getElementCount()); + buf.flip(); + crc32.update(buf); + buf.position(BUFFER_SIZE - Integer.BYTES).limit(BUFFER_SIZE); + buf.putInt((int)crc32.getValue()); } }