mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-19 04:45:07 -04:00
Release InboundMessage and request instances earlier (#126998)
Follow-up to #126138. We can now release requst bytes directly after deserialization. Also, request instances need not go through a ref-counting cycle when forking, removing some contention from transport threads.
This commit is contained in:
parent
0f0f0ef596
commit
0d01f88f95
1 changed files with 13 additions and 5 deletions
|
@ -264,7 +264,7 @@ public class InboundHandler {
|
|||
Releasables.assertOnce(message.takeBreakerReleaseControl())
|
||||
);
|
||||
|
||||
try (message) {
|
||||
try {
|
||||
messageListener.onRequestReceived(requestId, action);
|
||||
if (reg != null) {
|
||||
reg.addRequestStats(header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
|
||||
|
@ -278,9 +278,11 @@ public class InboundHandler {
|
|||
assert reg != null;
|
||||
final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput());
|
||||
assert assertRemoteVersion(stream, header.getVersion());
|
||||
final T request;
|
||||
T request;
|
||||
try {
|
||||
request = reg.newRequest(stream);
|
||||
message.close();
|
||||
message = null;
|
||||
} catch (Exception e) {
|
||||
assert ignoreDeserializationErrors : e;
|
||||
throw e;
|
||||
|
@ -295,13 +297,20 @@ public class InboundHandler {
|
|||
doHandleRequest(reg, request, transportChannel);
|
||||
}
|
||||
} else {
|
||||
handleRequestForking(request, reg, transportChannel);
|
||||
handleRequestForking(/* autocloses */ request, reg, transportChannel);
|
||||
request = null; // now owned by the thread we forked to
|
||||
}
|
||||
} finally {
|
||||
request.decRef();
|
||||
if (request != null) {
|
||||
request.decRef();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
sendErrorResponse(action, transportChannel, e);
|
||||
} finally {
|
||||
if (message != null) {
|
||||
message.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -315,7 +324,6 @@ public class InboundHandler {
|
|||
|
||||
private <T extends TransportRequest> void handleRequestForking(T request, RequestHandlerRegistry<T> reg, TransportChannel channel) {
|
||||
boolean success = false;
|
||||
request.mustIncRef();
|
||||
try {
|
||||
reg.getExecutor().execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
|
||||
@Override
|
||||
|
|
Loading…
Add table
Reference in a new issue