This commit avoids unnecessary thread sync of PQ notFull state. (#14129) (#14142)

When PQ is full, workers wake up writer thread in every read.
However, without removing a fully acked page, queue is still full.
This commit changes the condition of notFull signal.
Fixed: #6801

(cherry picked from commit da68ff3803)

Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com>
This commit is contained in:
github-actions[bot] 2022-05-25 14:45:26 +01:00 committed by GitHub
parent a6cd4bf949
commit 27c512fb17
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -509,21 +509,25 @@ public final class Queue implements Closeable {
public boolean isFull() { public boolean isFull() {
lock.lock(); lock.lock();
try { try {
if (this.maxBytes > 0L && isMaxBytesReached()) { return isMaxBytesReached() || isMaxUnreadReached();
return true;
} else {
return (this.maxUnread > 0 && this.unreadCount >= this.maxUnread);
}
} finally { } finally {
lock.unlock(); lock.unlock();
} }
} }
private boolean isMaxBytesReached() { private boolean isMaxBytesReached() {
if (this.maxBytes <= 0L) {
return false;
}
final long persistedByteSize = getPersistedByteSize(); final long persistedByteSize = getPersistedByteSize();
return ((persistedByteSize > this.maxBytes) || (persistedByteSize == this.maxBytes && !this.headPage.hasSpace(1))); return ((persistedByteSize > this.maxBytes) || (persistedByteSize == this.maxBytes && !this.headPage.hasSpace(1)));
} }
private boolean isMaxUnreadReached() {
return this.maxUnread > 0 && (this.unreadCount >= this.maxUnread);
}
/** /**
* Queue is considered empty if it does not contain any tail page and the headpage has no element or all * Queue is considered empty if it does not contain any tail page and the headpage has no element or all
* elements are acked * elements are acked
@ -636,7 +640,7 @@ public final class Queue implements Closeable {
} }
if (! p.isFullyRead()) { if (! p.isFullyRead()) {
boolean wasFull = isFull(); boolean wasFull = isMaxUnreadReached();
final SequencedList<byte[]> serialized = p.read(left); final SequencedList<byte[]> serialized = p.read(left);
int n = serialized.getElements().size(); int n = serialized.getElements().size();