Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial invokeBindingList implementation #1068

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@

package io.dapr.actors.runtime;

import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;

import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;

/**
* Class used to test different serializer implementations.
Expand Down Expand Up @@ -57,6 +58,22 @@ public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
}
}

/**
* {@inheritDoc}
*/
@Override
public <T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(data)) {
try (ObjectInputStream ois = new ObjectInputStream(bis)) {
try {
return (List<T>) ois.readObject();
} catch (Exception e) {
throw new IOException("Could not deserialize Java object.", e);
}
}
}
}

/**
* {@inheritDoc}
*/
Expand Down
18 changes: 18 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
public String getContentType() {
return "application/json";
}

@Override
public <T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException {
//Not needed here
throw new UnsupportedOperationException("Not supported yet.");
}
};
try (DaprClient client = new DaprClientBuilder().withObjectSerializer(serializer).build();
DaprPreviewClient previewClient = new DaprClientBuilder().withObjectSerializer(serializer).buildPreviewClient()) {
Expand Down Expand Up @@ -279,6 +285,12 @@ public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
public String getContentType() {
return "application/json";
}

@Override
public <T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException {
//Not needed here
throw new UnsupportedOperationException("Not supported yet.");
}
};

// Send a batch of messages on one topic
Expand Down Expand Up @@ -493,6 +505,12 @@ public <T> T deserialize(byte[] data, TypeRef<T> type) {
public String getContentType() {
return "application/octet-stream";
}

@Override
public <T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException {
//Not needed here
throw new UnsupportedOperationException("Not supported yet.");
}
};
try (DaprClient client = new DaprClientBuilder().withObjectSerializer(serializer).build()) {
client.publishEvent(
Expand Down
14 changes: 13 additions & 1 deletion sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


/**
* Abstract class with convenient methods common between client implementations.
*
Expand Down Expand Up @@ -270,6 +270,18 @@ public <T> Mono<T> invokeBinding(
return this.invokeBinding(bindingName, operation, data, metadata, TypeRef.get(clazz));
}

/**
* {@inheritDoc}
*/
@Override
public <T> Mono<List<T>> invokeBindingList(
String bindingName, String operation, Object data, Map<String, String> metadata, TypeRef<T> type) {
InvokeBindingRequest request = new InvokeBindingRequest(bindingName, operation)
.setData(data)
.setMetadata(metadata);
return this.invokeBindingList(request, type);
}

/**
* {@inheritDoc}
*/
Expand Down
29 changes: 27 additions & 2 deletions sdk/src/main/java/io/dapr/client/DaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.dapr.client;


import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DaprMetadata;
import io.dapr.client.domain.DeleteStateRequest;
Expand All @@ -39,11 +40,11 @@
import io.grpc.stub.AbstractStub;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.function.Function;


/**
* Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required.
*
Expand Down Expand Up @@ -299,7 +300,7 @@ Mono<byte[]> invokeMethod(String appId, String methodName, byte[] request, HttpE
*/
<T> Mono<T> invokeBinding(String bindingName, String operation, Object data, Map<String, String> metadata,
TypeRef<T> type);

/**
* Invokes a Binding operation.
*
Expand All @@ -324,6 +325,30 @@ <T> Mono<T> invokeBinding(String bindingName, String operation, Object data, Map
*/
<T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type);

/**
* Invokes a Binding operation that returns a List.
*
* @param request The binding invocation request.
* @param type The type being returned.
* @param <T> The type of the return
* @return a Mono plan of type List.
*/
<T> Mono<List<T>> invokeBindingList(InvokeBindingRequest request, TypeRef<T> type);

/**
* Invokes a Binding operation and expect a list of objects as return.
*
* @param bindingName The name of the biding to call.
* @param operation The operation to be performed by the binding request processor.
* @param data The data to be processed, use byte[] to skip serialization.
* @param metadata The metadata map.
* @param type The type being returned.
* @param <T> The type of the return
* @return a Mono plan of type List.
*/
<T> Mono<List<T>> invokeBindingList(String bindingName, String operation, Object data, Map<String, String> metadata,
TypeRef<T> type);

/**
* Retrieve a State based on their key.
*
Expand Down
48 changes: 48 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,54 @@ private <T> Mono<T> getMonoForHttpResponse(TypeRef<T> type, DaprHttp.Response r)
}
}


/**
* {@inheritDoc}
*/
@Override
public <T> Mono<List<T>> invokeBindingList(InvokeBindingRequest request, TypeRef<T> type) {
try {
final String name = request.getName();
final String operation = request.getOperation();
final Object data = request.getData();
final Map<String, String> metadata = request.getMetadata();
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Binding name cannot be null or empty.");
}

if (operation == null || operation.trim().isEmpty()) {
throw new IllegalArgumentException("Binding operation cannot be null or empty.");
}

byte[] byteData = objectSerializer.serialize(data);
DaprProtos.InvokeBindingRequest.Builder builder = DaprProtos.InvokeBindingRequest.newBuilder()
.setName(name).setOperation(operation);
if (byteData != null) {
builder.setData(ByteString.copyFrom(byteData));
}
if (metadata != null) {
builder.putAllMetadata(metadata);
}
DaprProtos.InvokeBindingRequest envelope = builder.build();

return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
)
).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserializeList(it.getData().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
}
}
);
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}

/**
* {@inheritDoc}
*/
Expand Down
45 changes: 43 additions & 2 deletions sdk/src/main/java/io/dapr/client/ObjectSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@
package io.dapr.client;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.MessageLite;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

/**
* Serializes and deserializes an internal object.
Expand Down Expand Up @@ -137,10 +139,49 @@ private <T> T deserialize(byte[] content, JavaType javaType) throws IOException
throw new IOException(e);
}
}

return OBJECT_MAPPER.readValue(content, javaType);
}

/**
* Deserializes the byte array into the a list of the clazz paramter type object.
*
* @param content Content to be parsed.
* @param clazz Type of the object being deserialized.
* @param <T> Generic type of the object being deserialized.
* @return Object of type T.
* @throws IOException In case content cannot be deserialized.
*/
public <T> List<T> deserializeList(byte[] content, Class<T> clazz) throws IOException {
List<List<String>> deserialized = deserialize(content,
OBJECT_MAPPER.constructType(new TypeReference<List<List<String>>>(){}));
List<T> results = new ArrayList<>(deserialized.size());
for (List<String> l : deserialized) {
results.add(OBJECT_MAPPER.readValue(l.get(0), clazz));
}

return results;
}

/**
* Deserializes the byte array into a collection containing the specified object.
*
* @param content Content to be parsed.
* @param type Type of the object being deserialized.
* @param <T> Generic type of the object being deserialized.
* @return List of Objects of type T.
* @throws IOException In case content cannot be deserialized.
*/
@SuppressWarnings("unchecked")
public <T> List<T> deserializeList(byte[] content, TypeRef<T> type) throws IOException {
try {
Class<T> clazz = (Class<T>) Class.forName(type.getType().getTypeName());
return deserializeList(content, clazz);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}

/**
* Parses the JSON content into a node for fine-grained processing.
*
Expand Down
14 changes: 13 additions & 1 deletion sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
package io.dapr.serializer;

import io.dapr.utils.TypeRef;

import java.io.IOException;
import java.util.List;


/**
* Serializes and deserializes application's objects.
Expand All @@ -42,6 +43,17 @@ public interface DaprObjectSerializer {
*/
<T> T deserialize(byte[] data, TypeRef<T> type) throws IOException;

/**
* Deserializes the given byte[] into a list of object.
*
* @param data Data to be deserialized.
* @param type Type of object to be deserialized.
* @param <T> Type of object to be deserialized.
* @return Deserialized List of objects.
* @throws IOException If cannot deserialize object.
*/
<T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException;

/**
* Returns the content type of the request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

import io.dapr.client.ObjectSerializer;
import io.dapr.utils.TypeRef;

import java.io.IOException;
import java.util.List;


/**
* Default serializer/deserializer for request/response objects and for state objects too.
Expand All @@ -39,6 +40,14 @@ public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return super.deserialize(data, type);
}

/**
* {@inheritDoc}
*/
@Override
public <T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException {
return super.deserializeList(data, type);
}

/**
* {@inheritDoc}
*/
Expand Down
Loading
Loading