diff --git a/logstash-core/src/main/java/org/logstash/common/io/FileCheckpointIO.java b/logstash-core/src/main/java/org/logstash/common/io/FileCheckpointIO.java index 5388003fc..4e99be58e 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/FileCheckpointIO.java +++ b/logstash-core/src/main/java/org/logstash/common/io/FileCheckpointIO.java @@ -1,6 +1,7 @@ package org.logstash.common.io; import org.logstash.ackedqueue.Checkpoint; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; import java.io.IOException; import java.io.InputStream; @@ -71,6 +72,7 @@ public class FileCheckpointIO implements CheckpointIO { @Override public void purge() throws IOException { // TODO: dir traversal and delete all checkpoints? + throw new NotImplementedException(); } // @return the head page checkpoint file name diff --git a/logstash-core/src/main/java/org/logstash/common/io/MemoryCheckpointIO.java b/logstash-core/src/main/java/org/logstash/common/io/MemoryCheckpointIO.java index 409095eb4..57f4af03d 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/MemoryCheckpointIO.java +++ b/logstash-core/src/main/java/org/logstash/common/io/MemoryCheckpointIO.java @@ -1,9 +1,12 @@ package org.logstash.common.io; import org.logstash.ackedqueue.Checkpoint; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; import java.io.IOException; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -12,7 +15,7 @@ public class MemoryCheckpointIO implements CheckpointIO { private final String HEAD_CHECKPOINT = "checkpoint.head"; private final String TAIL_CHECKPOINT = "checkpoint."; - private static final Map sources = new HashMap<>(); + private static final Map> sources = new HashMap<>(); private final String dirPath; @@ -26,8 +29,13 @@ public class MemoryCheckpointIO implements CheckpointIO { @Override public Checkpoint read(String fileName) throws IOException { - Checkpoint cp = this.sources.get(fileName); - if (cp == null) { throw new NoSuchFileException("no memory checkpoint for " + fileName); } + + Checkpoint cp = null; + Map ns = this.sources.get(dirPath); + if (ns != null) { + cp = ns.get(fileName); + } + if (cp == null) { throw new NoSuchFileException("no memory checkpoint for dirPath: " + this.dirPath + ", fileName: " + fileName); } return cp; } @@ -40,17 +48,25 @@ public class MemoryCheckpointIO implements CheckpointIO { @Override public void write(String fileName, Checkpoint checkpoint) throws IOException { - this.sources.put(fileName, checkpoint); + Map ns = this.sources.get(dirPath); + if (ns == null) { + ns = new HashMap<>(); + this.sources.put(this.dirPath, ns); + } + ns.put(fileName, checkpoint); } @Override public void purge(String fileName) { - this.sources.remove(fileName); + Map ns = this.sources.get(dirPath); + if (ns != null) { + ns.remove(fileName); + } } @Override public void purge() { - this.sources.clear(); + this.sources.remove(this.dirPath); } // @return the head page checkpoint file name diff --git a/logstash-core/src/test/java/org/logstash/common/io/MemoryCheckpointTest.java b/logstash-core/src/test/java/org/logstash/common/io/MemoryCheckpointTest.java index 843b73713..a77d8586d 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/MemoryCheckpointTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/MemoryCheckpointTest.java @@ -43,4 +43,46 @@ public class MemoryCheckpointTest { public void readInnexisting() throws IOException { io.read("checkpoint.invalid"); } + + @Test + public void readWriteDirPathNamespaced() throws IOException { + CheckpointIO io1 = new MemoryCheckpointIO("path1"); + CheckpointIO io2 = new MemoryCheckpointIO("path2"); + io1.write("checkpoint.head", 1, 0, 0, 0, 0); + io2.write("checkpoint.head", 2, 0, 0, 0, 0); + + Checkpoint checkpoint; + + checkpoint = io1.read("checkpoint.head"); + assertThat(checkpoint.getPageNum(), is(equalTo(1))); + + checkpoint = io2.read("checkpoint.head"); + assertThat(checkpoint.getPageNum(), is(equalTo(2))); + } + + @Test(expected = NoSuchFileException.class) + public void purgeDirPathNamespaced1() throws IOException { + CheckpointIO io1 = new MemoryCheckpointIO("path1"); + CheckpointIO io2 = new MemoryCheckpointIO("path2"); + io1.write("checkpoint.head", 1, 0, 0, 0, 0); + io2.write("checkpoint.head", 2, 0, 0, 0, 0); + + io1.purge("checkpoint.head"); + + Checkpoint checkpoint = io1.read("checkpoint.head"); + } + + @Test + public void purgeDirPathNamespaced2() throws IOException { + CheckpointIO io1 = new MemoryCheckpointIO("path1"); + CheckpointIO io2 = new MemoryCheckpointIO("path2"); + io1.write("checkpoint.head", 1, 0, 0, 0, 0); + io2.write("checkpoint.head", 2, 0, 0, 0, 0); + + io1.purge("checkpoint.head"); + + Checkpoint checkpoint; + checkpoint = io2.read("checkpoint.head"); + assertThat(checkpoint.getPageNum(), is(equalTo(2))); + } }