Skip to content

Commit

Permalink
Merge branch 'main' into vmodelid-in-payload
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Kadner <[email protected]>
  • Loading branch information
ckadner authored Nov 25, 2023
2 parents b3dfb38 + 582021e commit 5dc412f
Showing 1 changed file with 20 additions and 21 deletions.
41 changes: 20 additions & 21 deletions src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ public void onHalfClose() {
String vModelId = null;
String requestId = null;
ModelResponse response = null;
ByteBuf responsePayload = null;
try (InterruptingListener cancelListener = newInterruptingListener()) {
if (logHeaders != null) {
logHeaders.addToMDC(headers); // MDC cleared in finally block
Expand Down Expand Up @@ -767,18 +768,20 @@ public void onHalfClose() {
} finally {
if (payloadProcessor != null) {
processPayload(reqMessage.readerIndex(reqReaderIndex),
requestId, resolvedModelId, vModelId, methodName, headers, null, true);
requestId, resolvedModelId, vModelId, methodName, headers, null);
} else {
releaseReqMessage();
}
reqMessage = null; // ownership released or transferred
}

respReaderIndex = response.data.readerIndex();
respSize = response.data.readableBytes();
call.sendHeaders(response.metadata);
if (payloadProcessor != null) {
responsePayload = response.data.retainedSlice();
}
call.sendMessage(response.data);
// response is released via ReleaseAfterResponse.releaseAll()
// final response refcount is released via ReleaseAfterResponse.releaseAll()
status = OK;
} catch (Exception e) {
status = toStatus(e);
Expand All @@ -795,17 +798,13 @@ public void onHalfClose() {
evictMethodDescriptor(methodName);
}
} finally {
final boolean releaseResponse = status != OK;
if (payloadProcessor != null) {
ByteBuf data = null;
Metadata metadata = null;
if (response != null) {
data = response.data.readerIndex(respReaderIndex);
metadata = response.metadata;
}
processPayload(data, requestId, resolvedModelId, vModelId, methodName, metadata, status, releaseResponse);
} else if (releaseResponse && response != null) {
response.release();
Metadata metadata = response != null ? response.metadata : null;
processPayload(responsePayload, requestId, resolvedModelId, vModelId, methodName, metadata, status);
}
if (status != OK && response != null) {
// An additional release is required if we call.sendMessage() wasn't sucessful
response.data.release();
}
ReleaseAfterResponse.releaseAll();
clearThreadLocals();
Expand All @@ -820,24 +819,23 @@ public void onHalfClose() {
}

/**
* Invoke PayloadProcessor on the request/response data
* Invoke PayloadProcessor on the request/response data. This method takes ownership
* of the passed-in {@code ByteBuf}.
*
* @param data the binary data
* @param payloadId the id of the request
* @param modelId the id of the model
* @param vModelId the id of the vModel
* @param methodName the name of the invoked method
* @param metadata the method name metadata
* @param status null for requests, non-null for responses
* @param takeOwnership whether the processor should take ownership
*/
private void processPayload(ByteBuf data, String payloadId, String vModelId, String modelId, String methodName,
Metadata metadata, io.grpc.Status status, boolean takeOwnership) {
private void processPayload(ByteBuf data, String payloadId, String modelId, String vModelId, String methodName,
Metadata metadata, io.grpc.Status status) {
Payload payload = null;
try {
assert payloadProcessor != null;
if (!takeOwnership) {
ReferenceCountUtil.retain(data);
}
payload = new Payload(payloadId, modelId, vModelId, methodName, metadata, data, status);
payload = new Payload(payloadId, modelId, methodName, metadata, data, status);
if (payloadProcessor.process(payload)) {
data = null; // ownership transferred
}
Expand Down Expand Up @@ -1200,6 +1198,7 @@ public void getVModelStatus(GetVModelStatusRequest request, StreamObserver<VMode
} finally {
clearThreadLocals();
}

}

@Override
Expand Down

0 comments on commit 5dc412f

Please sign in to comment.