Separate scheduling of segments flushes from time (#15680) (#15697)

Introduces a new interface named SchedulerService to abstract from the ScheduledExecutorService to execute the DLQ flushes of segments. Abstracting from time provides a benefit in testing, where the test doesn't have to wait for things to happen, but those things could happen synchronously.

(cherry picked from commit 6b22cc8dd1)

Co-authored-by: Andrea Selva <selva.andre@gmail.com>
This commit is contained in:
github-actions[bot] 2024-01-09 10:03:07 +00:00 committed by GitHub
parent 21ccd8cb5d
commit dccb2cec80
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 16 deletions

View file

@ -126,6 +126,44 @@ public final class DeadLetterQueueWriter implements Closeable {
private volatile Optional<Path> oldestSegmentPath = Optional.empty();
private final TemporalAmount retentionTime;
private final SchedulerService flusherService;
interface SchedulerService {
/**
* Register the callback action to invoke on every clock tick.
* */
void repeatedAction(Runnable action);
}
private static class FixedRateScheduler implements SchedulerService {
private final ScheduledExecutorService scheduledExecutor;
FixedRateScheduler() {
scheduledExecutor = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r);
//Allow this thread to die when the JVM dies
t.setDaemon(true);
//Set the name
t.setName("dlq-flush-check");
return t;
});
}
@Override
public void repeatedAction(Runnable action) {
scheduledExecutor.scheduleAtFixedRate(action, 1L, 1L, TimeUnit.SECONDS);
}
}
private static class NoopScheduler implements SchedulerService {
@Override
public void repeatedAction(Runnable action) {
// Noop
}
}
public static final class Builder {
private final Path queuePath;
@ -136,6 +174,7 @@ public final class DeadLetterQueueWriter implements Closeable {
private QueueStorageType storageType = QueueStorageType.DROP_NEWER;
private Duration retentionTime = null;
private Clock clock = Clock.systemDefaultZone();
private SchedulerService customSchedulerService = null;
private Builder(Path queuePath, long maxSegmentSize, long maxQueueSize, Duration flushInterval) {
this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, true);
@ -165,8 +204,28 @@ public final class DeadLetterQueueWriter implements Closeable {
return this;
}
@VisibleForTesting
Builder flusherService(SchedulerService service) {
this.customSchedulerService = service;
return this;
}
public DeadLetterQueueWriter build() throws IOException {
return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, flushInterval, storageType, retentionTime, clock, startScheduledFlusher);
if (customSchedulerService != null && startScheduledFlusher) {
throw new IllegalArgumentException("Both default scheduler and custom scheduler were defined, ");
}
SchedulerService schedulerService;
if (customSchedulerService != null) {
schedulerService = customSchedulerService;
} else {
if (startScheduledFlusher) {
schedulerService = new FixedRateScheduler();
} else {
schedulerService = new NoopScheduler();
}
}
return new DeadLetterQueueWriter(queuePath, maxSegmentSize, maxQueueSize, flushInterval, storageType, retentionTime, clock, schedulerService);
}
}
@ -182,7 +241,7 @@ public final class DeadLetterQueueWriter implements Closeable {
private DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize,
final Duration flushInterval, final QueueStorageType storageType, final Duration retentionTime,
final Clock clock, boolean startScheduledFlusher) throws IOException {
final Clock clock, SchedulerService flusherService) throws IOException {
this.clock = clock;
this.fileLock = FileLockFactory.obtainLock(queuePath, LOCK_FILE);
@ -202,9 +261,8 @@ public final class DeadLetterQueueWriter implements Closeable {
.max().orElse(0);
nextWriter();
this.lastEntryTimestamp = Timestamp.now();
if (startScheduledFlusher) {
createFlushScheduler();
}
this.flusherService = flusherService;
this.flusherService.repeatedAction(this::scheduledFlushCheck);
}
public boolean isOpen() {

View file

@ -67,16 +67,33 @@ public class DeadLetterQueueWriterAgeRetentionTest {
}
}
private static class SynchronizedScheduledService implements DeadLetterQueueWriter.SchedulerService {
private Runnable action;
@Override
public void repeatedAction(Runnable action) {
this.action = action;
}
void executeAction() {
action.run();
}
}
private Path dir;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private SynchronizedScheduledService synchScheduler;
@Before
public void setUp() throws Exception {
dir = temporaryFolder.newFolder().toPath();
final Clock pointInTimeFixedClock = Clock.fixed(Instant.parse("2022-02-22T10:20:30.00Z"), ZoneId.of("Europe/Rome"));
fakeClock = new ForwardableClock(pointInTimeFixedClock);
synchScheduler = new SynchronizedScheduledService();
}
@Test
@ -272,7 +289,7 @@ public class DeadLetterQueueWriterAgeRetentionTest {
}
@Test
public void testDLQWriterFlusherRemovesExpiredSegmentWhenCurrentWriterIsStale() throws IOException, InterruptedException {
public void testDLQWriterFlusherRemovesExpiredSegmentWhenCurrentWriterIsStale() throws IOException {
final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(
Collections.singletonMap("message", "Not so important content"));
@ -298,9 +315,10 @@ public class DeadLetterQueueWriterAgeRetentionTest {
// move forward 3 days, so that the first segment becomes eligible to be deleted by the age retention policy
fakeClock.forward(Duration.ofDays(3));
try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter
.newBuilder(dir, 10 * MB, 1 * GB, flushInterval)
.newBuilderWithoutFlusher(dir, 10 * MB, 1 * GB)
.retentionTime(retainedPeriod)
.clock(fakeClock)
.flusherService(synchScheduler)
.build()) {
// write an element to make head segment stale
final Event anotherEvent = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(
@ -308,8 +326,7 @@ public class DeadLetterQueueWriterAgeRetentionTest {
DLQEntry entry = new DLQEntry(anotherEvent, "", "", "00002", DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(fakeClock));
writeManager.writeEntry(entry);
// wait a cycle of flusher schedule
Thread.sleep(flushInterval.toMillis());
triggerExecutionOfFlush();
// flusher should clean the expired segments
Set<String> actual = listFileNames(dir);
@ -317,9 +334,8 @@ public class DeadLetterQueueWriterAgeRetentionTest {
}
}
@Test
public void testDLQWriterFlusherRemovesExpiredSegmentWhenCurrentHeadSegmentIsEmpty() throws IOException, InterruptedException {
public void testDLQWriterFlusherRemovesExpiredSegmentWhenCurrentHeadSegmentIsEmpty() throws IOException {
final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(
Collections.singletonMap("message", "Not so important content"));
@ -328,31 +344,38 @@ public class DeadLetterQueueWriterAgeRetentionTest {
final ForwardableClock fakeClock = new ForwardableClock(pointInTimeFixedClock);
Duration retainedPeriod = Duration.ofDays(1);
Duration flushInterval = Duration.ofSeconds(1);
try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter
.newBuilder(dir, 10 * MB, 1 * GB, flushInterval)
.newBuilderWithoutFlusher(dir, 10 * MB, 1 * GB)
.retentionTime(retainedPeriod)
.clock(fakeClock)
.flusherService(synchScheduler)
.build()) {
DLQEntry entry = new DLQEntry(event, "", "", "00001", DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(fakeClock));
writeManager.writeEntry(entry);
triggerExecutionOfFlush();
// wait the flush interval so that the current head segment is sealed
Awaitility.await("After the flush interval head segment is sealed and a fresh empty head is created")
.atLeast(flushInterval)
.atMost(Duration.ofMinutes(1))
.atMost(Duration.ofSeconds(1))
.until(() -> Set.of("1.log", "2.log.tmp", ".lock").equals(listFileNames(dir)));
// move forward the time so that the age policy is kicked in when the current head segment is empty
fakeClock.forward(retainedPeriod.plusMinutes(2));
triggerExecutionOfFlush();
// wait the flush period
Awaitility.await("Remains the untouched head segment while the expired is removed")
// wait at least the flush period
.atMost(Duration.ofMinutes(1))
.atMost(Duration.ofSeconds(1))
// check the expired sealed segment is removed
.until(() -> Set.of("2.log.tmp", ".lock").equals(listFileNames(dir)));
}
}
private void triggerExecutionOfFlush() {
synchScheduler.executeAction();
}
}