namespace memory checkpoints for multiple queues support

memory checkpoint IO need the purge() method
This commit is contained in:
Colin Surprenant 2017-02-21 11:44:50 -05:00
parent 8a8f04ab1d
commit 8c000445c2
3 changed files with 66 additions and 6 deletions

View file

@ -1,6 +1,7 @@
package org.logstash.common.io; package org.logstash.common.io;
import org.logstash.ackedqueue.Checkpoint; import org.logstash.ackedqueue.Checkpoint;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -71,6 +72,7 @@ public class FileCheckpointIO implements CheckpointIO {
@Override @Override
public void purge() throws IOException { public void purge() throws IOException {
// TODO: dir traversal and delete all checkpoints? // TODO: dir traversal and delete all checkpoints?
throw new NotImplementedException();
} }
// @return the head page checkpoint file name // @return the head page checkpoint file name

View file

@ -1,9 +1,12 @@
package org.logstash.common.io; package org.logstash.common.io;
import org.logstash.ackedqueue.Checkpoint; import org.logstash.ackedqueue.Checkpoint;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.io.IOException; import java.io.IOException;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -12,7 +15,7 @@ public class MemoryCheckpointIO implements CheckpointIO {
private final String HEAD_CHECKPOINT = "checkpoint.head"; private final String HEAD_CHECKPOINT = "checkpoint.head";
private final String TAIL_CHECKPOINT = "checkpoint."; private final String TAIL_CHECKPOINT = "checkpoint.";
private static final Map<String, Checkpoint> sources = new HashMap<>(); private static final Map<String, Map<String, Checkpoint>> sources = new HashMap<>();
private final String dirPath; private final String dirPath;
@ -26,8 +29,13 @@ public class MemoryCheckpointIO implements CheckpointIO {
@Override @Override
public Checkpoint read(String fileName) throws IOException { public Checkpoint read(String fileName) throws IOException {
Checkpoint cp = this.sources.get(fileName);
if (cp == null) { throw new NoSuchFileException("no memory checkpoint for " + fileName); } Checkpoint cp = null;
Map<String, Checkpoint> ns = this.sources.get(dirPath);
if (ns != null) {
cp = ns.get(fileName);
}
if (cp == null) { throw new NoSuchFileException("no memory checkpoint for dirPath: " + this.dirPath + ", fileName: " + fileName); }
return cp; return cp;
} }
@ -40,17 +48,25 @@ public class MemoryCheckpointIO implements CheckpointIO {
@Override @Override
public void write(String fileName, Checkpoint checkpoint) throws IOException { public void write(String fileName, Checkpoint checkpoint) throws IOException {
this.sources.put(fileName, checkpoint); Map<String, Checkpoint> ns = this.sources.get(dirPath);
if (ns == null) {
ns = new HashMap<>();
this.sources.put(this.dirPath, ns);
}
ns.put(fileName, checkpoint);
} }
@Override @Override
public void purge(String fileName) { public void purge(String fileName) {
this.sources.remove(fileName); Map<String, Checkpoint> ns = this.sources.get(dirPath);
if (ns != null) {
ns.remove(fileName);
}
} }
@Override @Override
public void purge() { public void purge() {
this.sources.clear(); this.sources.remove(this.dirPath);
} }
// @return the head page checkpoint file name // @return the head page checkpoint file name

View file

@ -43,4 +43,46 @@ public class MemoryCheckpointTest {
public void readInnexisting() throws IOException { public void readInnexisting() throws IOException {
io.read("checkpoint.invalid"); io.read("checkpoint.invalid");
} }
@Test
public void readWriteDirPathNamespaced() throws IOException {
CheckpointIO io1 = new MemoryCheckpointIO("path1");
CheckpointIO io2 = new MemoryCheckpointIO("path2");
io1.write("checkpoint.head", 1, 0, 0, 0, 0);
io2.write("checkpoint.head", 2, 0, 0, 0, 0);
Checkpoint checkpoint;
checkpoint = io1.read("checkpoint.head");
assertThat(checkpoint.getPageNum(), is(equalTo(1)));
checkpoint = io2.read("checkpoint.head");
assertThat(checkpoint.getPageNum(), is(equalTo(2)));
}
@Test(expected = NoSuchFileException.class)
public void purgeDirPathNamespaced1() throws IOException {
CheckpointIO io1 = new MemoryCheckpointIO("path1");
CheckpointIO io2 = new MemoryCheckpointIO("path2");
io1.write("checkpoint.head", 1, 0, 0, 0, 0);
io2.write("checkpoint.head", 2, 0, 0, 0, 0);
io1.purge("checkpoint.head");
Checkpoint checkpoint = io1.read("checkpoint.head");
}
@Test
public void purgeDirPathNamespaced2() throws IOException {
CheckpointIO io1 = new MemoryCheckpointIO("path1");
CheckpointIO io2 = new MemoryCheckpointIO("path2");
io1.write("checkpoint.head", 1, 0, 0, 0, 0);
io2.write("checkpoint.head", 2, 0, 0, 0, 0);
io1.purge("checkpoint.head");
Checkpoint checkpoint;
checkpoint = io2.read("checkpoint.head");
assertThat(checkpoint.getPageNum(), is(equalTo(2)));
}
} }