mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
6962fc2a06
commit
68b22280bc
1 changed files with 17 additions and 7 deletions
|
@ -768,16 +768,26 @@ public class Queue implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getAckedCount() {
|
public long getAckedCount() {
|
||||||
return headPage.ackedSeqNums.cardinality() + tailPages.stream()
|
lock.lock();
|
||||||
.mapToLong(page -> page.ackedSeqNums.cardinality())
|
try {
|
||||||
.sum();
|
return headPage.ackedSeqNums.cardinality() + tailPages.stream()
|
||||||
|
.mapToLong(page -> page.ackedSeqNums.cardinality()).sum();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getUnackedCount() {
|
public long getUnackedCount() {
|
||||||
long headPageCount = (headPage.getElementCount() - headPage.ackedSeqNums.cardinality());
|
lock.lock();
|
||||||
long tailPagesCount = tailPages.stream()
|
try {
|
||||||
.mapToLong(page -> (page.getElementCount() - page.ackedSeqNums.cardinality())).sum();
|
long headPageCount = (headPage.getElementCount() - headPage.ackedSeqNums.cardinality());
|
||||||
return headPageCount + tailPagesCount;
|
long tailPagesCount = tailPages.stream()
|
||||||
|
.mapToLong(page -> (page.getElementCount() - page.ackedSeqNums.cardinality()))
|
||||||
|
.sum();
|
||||||
|
return headPageCount + tailPagesCount;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected long nextSeqNum() {
|
protected long nextSeqNum() {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue