diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index 0c437a17..14073fd0 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -725,6 +725,7 @@ public void onHalfClose() { String vModelId = null; String requestId = null; ModelResponse response = null; + ByteBuf responsePayload = null; try (InterruptingListener cancelListener = newInterruptingListener()) { if (logHeaders != null) { logHeaders.addToMDC(headers); // MDC cleared in finally block @@ -767,18 +768,20 @@ public void onHalfClose() { } finally { if (payloadProcessor != null) { processPayload(reqMessage.readerIndex(reqReaderIndex), - requestId, resolvedModelId, vModelId, methodName, headers, null, true); + requestId, resolvedModelId, vModelId, methodName, headers, null); } else { releaseReqMessage(); } reqMessage = null; // ownership released or transferred } - respReaderIndex = response.data.readerIndex(); respSize = response.data.readableBytes(); call.sendHeaders(response.metadata); + if (payloadProcessor != null) { + responsePayload = response.data.retainedSlice(); + } call.sendMessage(response.data); - // response is released via ReleaseAfterResponse.releaseAll() + // final response refcount is released via ReleaseAfterResponse.releaseAll() status = OK; } catch (Exception e) { status = toStatus(e); @@ -795,17 +798,13 @@ public void onHalfClose() { evictMethodDescriptor(methodName); } } finally { - final boolean releaseResponse = status != OK; if (payloadProcessor != null) { - ByteBuf data = null; - Metadata metadata = null; - if (response != null) { - data = response.data.readerIndex(respReaderIndex); - metadata = response.metadata; - } - processPayload(data, requestId, resolvedModelId, vModelId, methodName, metadata, status, releaseResponse); - } else if (releaseResponse && response != null) { - response.release(); + Metadata metadata = response != null ? response.metadata : null; + processPayload(responsePayload, requestId, resolvedModelId, vModelId, methodName, metadata, status); + } + if (status != OK && response != null) { + // An additional release is required if we call.sendMessage() wasn't sucessful + response.data.release(); } ReleaseAfterResponse.releaseAll(); clearThreadLocals(); @@ -820,24 +819,23 @@ public void onHalfClose() { } /** - * Invoke PayloadProcessor on the request/response data + * Invoke PayloadProcessor on the request/response data. This method takes ownership + * of the passed-in {@code ByteBuf}. + * * @param data the binary data * @param payloadId the id of the request * @param modelId the id of the model + * @param vModelId the id of the vModel * @param methodName the name of the invoked method * @param metadata the method name metadata * @param status null for requests, non-null for responses - * @param takeOwnership whether the processor should take ownership */ - private void processPayload(ByteBuf data, String payloadId, String vModelId, String modelId, String methodName, - Metadata metadata, io.grpc.Status status, boolean takeOwnership) { + private void processPayload(ByteBuf data, String payloadId, String modelId, String vModelId, String methodName, + Metadata metadata, io.grpc.Status status) { Payload payload = null; try { assert payloadProcessor != null; - if (!takeOwnership) { - ReferenceCountUtil.retain(data); - } - payload = new Payload(payloadId, modelId, vModelId, methodName, metadata, data, status); + payload = new Payload(payloadId, modelId, methodName, metadata, data, status); if (payloadProcessor.process(payload)) { data = null; // ownership transferred } @@ -1200,6 +1198,7 @@ public void getVModelStatus(GetVModelStatusRequest request, StreamObserver