From 9cafe943070b5b5d1745b00b93cf3fdac7455617 Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 8 Jul 2024 09:10:07 +0100 Subject: [PATCH 1/5] initial invokeBindingList implementation Signed-off-by: salaboy --- .../dapr/actors/runtime/JavaSerializer.java | 23 +++++++-- .../io/dapr/client/AbstractDaprClient.java | 27 ++++++++--- .../main/java/io/dapr/client/DaprClient.java | 34 +++++++++++-- .../java/io/dapr/client/DaprClientImpl.java | 48 +++++++++++++++++++ .../java/io/dapr/client/ObjectSerializer.java | 46 ++++++++++++++++-- .../dapr/serializer/DaprObjectSerializer.java | 16 ++++++- .../serializer/DefaultObjectSerializer.java | 12 ++++- .../io/dapr/client/DaprClientGrpcTest.java | 21 ++++++++ .../io/dapr/client/DaprClientHttpTest.java | 5 ++ 9 files changed, 210 insertions(+), 22 deletions(-) diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java index 28c39e8b8..7a11e673b 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java @@ -13,14 +13,15 @@ package io.dapr.actors.runtime; -import io.dapr.serializer.DaprObjectSerializer; -import io.dapr.utils.TypeRef; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.util.List; + +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.utils.TypeRef; /** * Class used to test different serializer implementations. @@ -57,6 +58,22 @@ public T deserialize(byte[] data, TypeRef type) throws IOException { } } + /** + * {@inheritDoc} + */ + @Override + public List deserializeList(byte[] data, TypeRef type) throws IOException { + try (ByteArrayInputStream bis = new ByteArrayInputStream(data)) { + try (ObjectInputStream ois = new ObjectInputStream(bis)) { + try { + return (List) ois.readObject(); + } catch (Exception e) { + throw new IOException("Could not deserialize Java object.", e); + } + } + } + } + /** * {@inheritDoc} */ diff --git a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java index 61a7ad5c1..dd8003317 100644 --- a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java +++ b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java @@ -13,7 +13,15 @@ package io.dapr.client; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import com.fasterxml.jackson.databind.ObjectMapper; + import io.dapr.client.domain.BulkPublishEntry; import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; @@ -48,13 +56,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - /** * Abstract class with convenient methods common between client implementations. * @@ -270,6 +271,18 @@ public Mono invokeBinding( return this.invokeBinding(bindingName, operation, data, metadata, TypeRef.get(clazz)); } + /** + * {@inheritDoc} + */ + @Override + public Mono> invokeBindingList( + String bindingName, String operation, Object data, Map metadata, TypeRef type) { + InvokeBindingRequest request = new InvokeBindingRequest(bindingName, operation) + .setData(data) + .setMetadata(metadata); + return this.invokeBindingList(request, type); + } + /** * {@inheritDoc} */ diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index 9b713f7c7..2d7203ec2 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -13,6 +13,10 @@ package io.dapr.client; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + import io.dapr.client.domain.ConfigurationItem; import io.dapr.client.domain.DaprMetadata; import io.dapr.client.domain.DeleteStateRequest; @@ -40,10 +44,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.List; -import java.util.Map; -import java.util.function.Function; - /** * Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required. * @@ -299,7 +299,7 @@ Mono invokeMethod(String appId, String methodName, byte[] request, HttpE */ Mono invokeBinding(String bindingName, String operation, Object data, Map metadata, TypeRef type); - + /** * Invokes a Binding operation. * @@ -324,6 +324,30 @@ Mono invokeBinding(String bindingName, String operation, Object data, Map */ Mono invokeBinding(InvokeBindingRequest request, TypeRef type); + /** + * Invokes a Binding operation that returns a List. + * + * @param request The binding invocation request. + * @param type The type being returned. + * @param The type of the return + * @return a Mono plan of type List. + */ + Mono> invokeBindingList(InvokeBindingRequest request, TypeRef type); + + /** + * Invokes a Binding operation and expect a list of objects as return. + * + * @param bindingName The name of the biding to call. + * @param operation The operation to be performed by the binding request processor. + * @param data The data to be processed, use byte[] to skip serialization. + * @param metadata The metadata map. + * @param type The type being returned. + * @param The type of the return + * @return a Mono plan of type List. + */ + Mono> invokeBindingList(String bindingName, String operation, Object data, Map metadata, + TypeRef type); + /** * Retrieve a State based on their key. * diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index aabc07a60..a7a26fcd9 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -457,6 +457,54 @@ private Mono getMonoForHttpResponse(TypeRef type, DaprHttp.Response r) } } + + /** + * {@inheritDoc} + */ + @Override + public Mono> invokeBindingList(InvokeBindingRequest request, TypeRef type) { + try { + final String name = request.getName(); + final String operation = request.getOperation(); + final Object data = request.getData(); + final Map metadata = request.getMetadata(); + if (name == null || name.trim().isEmpty()) { + throw new IllegalArgumentException("Binding name cannot be null or empty."); + } + + if (operation == null || operation.trim().isEmpty()) { + throw new IllegalArgumentException("Binding operation cannot be null or empty."); + } + + byte[] byteData = objectSerializer.serialize(data); + DaprProtos.InvokeBindingRequest.Builder builder = DaprProtos.InvokeBindingRequest.newBuilder() + .setName(name).setOperation(operation); + if (byteData != null) { + builder.setData(ByteString.copyFrom(byteData)); + } + if (metadata != null) { + builder.putAllMetadata(metadata); + } + DaprProtos.InvokeBindingRequest envelope = builder.build(); + + return Mono.deferContextual( + context -> this.createMono( + it -> intercept(context, asyncStub).invokeBinding(envelope, it) + ) + ).flatMap( + it -> { + try { + return Mono.justOrEmpty(objectSerializer.deserializeList(it.getData().toByteArray(), type)); + } catch (IOException e) { + throw DaprException.propagate(e); + } + } + ); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + /** * {@inheritDoc} */ diff --git a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java index a131060b9..0ae777308 100644 --- a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java @@ -13,18 +13,21 @@ package io.dapr.client; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; + import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.MessageLite; + import io.dapr.client.domain.CloudEvent; import io.dapr.utils.TypeRef; -import java.io.IOException; -import java.lang.reflect.Method; - /** * Serializes and deserializes an internal object. */ @@ -99,6 +102,41 @@ public T deserialize(byte[] content, Class clazz) throws IOException { return deserialize(content, OBJECT_MAPPER.constructType(clazz)); } + /** + * Deserializes the byte array into the a list of the clazz paramter type object. + * + * @param content Content to be parsed. + * @param clazz Type of the object being deserialized. + * @param Generic type of the object being deserialized. + * @return Object of type T. + * @throws IOException In case content cannot be deserialized. + */ + @SuppressWarnings("unchecked") + public List deserializeList(byte[] content, Class clazz) throws IOException { + Class clazzArray = ((T[]) java.lang.reflect.Array.newInstance(clazz, 1)).getClass(); + return Arrays.asList(deserialize(content, OBJECT_MAPPER.constructType(clazzArray))); + } + + /** + * Deserializes the byte array into the original object. + * + * @param content Content to be parsed. + * @param type Type of the object being deserialized. + * @param Generic type of the object being deserialized. + * @return Object of type T. + * @throws IOException In case content cannot be deserialized. + */ + @SuppressWarnings("unchecked") + public List deserializeList(byte[] content, TypeRef type) throws IOException { + try { + Class clazz = (Class) Class.forName(type.getType().getTypeName()); + return deserializeList(content, clazz); + }catch (Exception e){ + e.printStackTrace(); + } + return null; + } + private T deserialize(byte[] content, JavaType javaType) throws IOException { if ((javaType == null) || javaType.isTypeOrSubTypeOf(Void.class)) { return null; @@ -137,7 +175,7 @@ private T deserialize(byte[] content, JavaType javaType) throws IOException throw new IOException(e); } } - + return OBJECT_MAPPER.readValue(content, javaType); } diff --git a/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java b/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java index 6cd1af65c..55597dd5c 100644 --- a/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java @@ -13,9 +13,10 @@ package io.dapr.serializer; -import io.dapr.utils.TypeRef; - import java.io.IOException; +import java.util.List; + +import io.dapr.utils.TypeRef; /** * Serializes and deserializes application's objects. @@ -42,6 +43,17 @@ public interface DaprObjectSerializer { */ T deserialize(byte[] data, TypeRef type) throws IOException; + /** + * Deserializes the given byte[] into a list of object. + * + * @param data Data to be deserialized. + * @param type Type of object to be deserialized. + * @param Type of object to be deserialized. + * @return Deserialized List of objects. + * @throws IOException If cannot deserialize object. + */ + List deserializeList(byte[] data, TypeRef type) throws IOException; + /** * Returns the content type of the request. * diff --git a/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java b/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java index 3527f2e7a..467559669 100644 --- a/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java @@ -13,10 +13,12 @@ package io.dapr.serializer; +import java.io.IOException; +import java.util.List; + import io.dapr.client.ObjectSerializer; import io.dapr.utils.TypeRef; -import java.io.IOException; /** * Default serializer/deserializer for request/response objects and for state objects too. @@ -39,6 +41,14 @@ public T deserialize(byte[] data, TypeRef type) throws IOException { return super.deserialize(data, type); } + /** + * {@inheritDoc} + */ + @Override + public List deserializeList(byte[] data, TypeRef type) throws IOException { + return super.deserializeList(data, type); + } + /** * {@inheritDoc} */ diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 1081220a9..cb2e16b47 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -406,6 +406,27 @@ public void invokeBindingResponseObjectTypeRefTest() throws IOException { assertEquals("OK", result.block()); } + @Test + public void invokeBindingListResponseObjectTypeRefTest() throws IOException { + List list = new ArrayList<>(); + MyObject obj1 = new MyObject(1, "Object1"); + MyObject obj2 = new MyObject(2, "Objet2"); + list.add(obj1); + list.add(obj2); + DaprProtos.InvokeBindingResponse.Builder responseBuilder = + DaprProtos.InvokeBindingResponse.newBuilder().setData(serialize(list)); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseBuilder.build()); + observer.onCompleted(); + return null; + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + + Mono> result = client.invokeBindingList("BindingName", "MyOperation", null, null, TypeRef.get(MyObject.class)); + + assertEquals(list, result.block()); + } + @Test public void invokeBindingObjectNoHotMono() throws IOException { AtomicBoolean called = new AtomicBoolean(false); diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 48b7cd0ac..0350045a3 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -487,6 +487,11 @@ public T deserialize(byte[] data, TypeRef type) throws IOException { return XML_MAPPER.readValue(data, new TypeReference() {}); } + @Override + public List deserializeList(byte[] data, TypeRef type) throws IOException { + return XML_MAPPER.readValue(data, new TypeReference>() {}); + } + @Override public String getContentType() { return "application/xml"; From 7dec5021c0232d65e94a3f56ad0d090b115612d2 Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 8 Jul 2024 09:46:01 +0100 Subject: [PATCH 2/5] fixing checkstyle Signed-off-by: salaboy --- .../io/dapr/client/AbstractDaprClient.java | 15 ++-- .../main/java/io/dapr/client/DaprClient.java | 7 +- .../java/io/dapr/client/ObjectSerializer.java | 79 +++++++++---------- .../dapr/serializer/DaprObjectSerializer.java | 2 +- .../serializer/DefaultObjectSerializer.java | 5 +- 5 files changed, 52 insertions(+), 56 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java index dd8003317..faaccaa2f 100644 --- a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java +++ b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java @@ -13,15 +13,7 @@ package io.dapr.client; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - import com.fasterxml.jackson.databind.ObjectMapper; - import io.dapr.client.domain.BulkPublishEntry; import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; @@ -55,6 +47,13 @@ import io.dapr.utils.TypeRef; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + /** * Abstract class with convenient methods common between client implementations. diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index 2d7203ec2..fe469344d 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -13,9 +13,6 @@ package io.dapr.client; -import java.util.List; -import java.util.Map; -import java.util.function.Function; import io.dapr.client.domain.ConfigurationItem; import io.dapr.client.domain.DaprMetadata; @@ -43,6 +40,10 @@ import io.grpc.stub.AbstractStub; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + /** * Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required. diff --git a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java index 0ae777308..b8cb3e506 100644 --- a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java @@ -13,20 +13,18 @@ package io.dapr.client; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.List; - import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.MessageLite; - import io.dapr.client.domain.CloudEvent; import io.dapr.utils.TypeRef; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; /** * Serializes and deserializes an internal object. @@ -102,41 +100,6 @@ public T deserialize(byte[] content, Class clazz) throws IOException { return deserialize(content, OBJECT_MAPPER.constructType(clazz)); } - /** - * Deserializes the byte array into the a list of the clazz paramter type object. - * - * @param content Content to be parsed. - * @param clazz Type of the object being deserialized. - * @param Generic type of the object being deserialized. - * @return Object of type T. - * @throws IOException In case content cannot be deserialized. - */ - @SuppressWarnings("unchecked") - public List deserializeList(byte[] content, Class clazz) throws IOException { - Class clazzArray = ((T[]) java.lang.reflect.Array.newInstance(clazz, 1)).getClass(); - return Arrays.asList(deserialize(content, OBJECT_MAPPER.constructType(clazzArray))); - } - - /** - * Deserializes the byte array into the original object. - * - * @param content Content to be parsed. - * @param type Type of the object being deserialized. - * @param Generic type of the object being deserialized. - * @return Object of type T. - * @throws IOException In case content cannot be deserialized. - */ - @SuppressWarnings("unchecked") - public List deserializeList(byte[] content, TypeRef type) throws IOException { - try { - Class clazz = (Class) Class.forName(type.getType().getTypeName()); - return deserializeList(content, clazz); - }catch (Exception e){ - e.printStackTrace(); - } - return null; - } - private T deserialize(byte[] content, JavaType javaType) throws IOException { if ((javaType == null) || javaType.isTypeOrSubTypeOf(Void.class)) { return null; @@ -179,6 +142,40 @@ private T deserialize(byte[] content, JavaType javaType) throws IOException return OBJECT_MAPPER.readValue(content, javaType); } + /** + * Deserializes the byte array into the a list of the clazz paramter type object. + * + * @param content Content to be parsed. + * @param clazz Type of the object being deserialized. + * @param Generic type of the object being deserialized. + * @return Object of type T. + * @throws IOException In case content cannot be deserialized. + */ + @SuppressWarnings("unchecked") + public List deserializeList(byte[] content, Class clazz) throws IOException { + Class clazzArray = ((T[]) java.lang.reflect.Array.newInstance(clazz, 1)).getClass(); + return Arrays.asList(deserialize(content, OBJECT_MAPPER.constructType(clazzArray))); + } + + /** + * Deserializes the byte array into a collection containing the specified object. + * + * @param content Content to be parsed. + * @param type Type of the object being deserialized. + * @param Generic type of the object being deserialized. + * @return List of Objects of type T. + * @throws IOException In case content cannot be deserialized. + */ + @SuppressWarnings("unchecked") + public List deserializeList(byte[] content, TypeRef type) throws IOException { + try { + Class clazz = (Class) Class.forName(type.getType().getTypeName()); + return deserializeList(content, clazz); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + /** * Parses the JSON content into a node for fine-grained processing. * diff --git a/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java b/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java index 55597dd5c..2a095620a 100644 --- a/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java @@ -13,10 +13,10 @@ package io.dapr.serializer; +import io.dapr.utils.TypeRef; import java.io.IOException; import java.util.List; -import io.dapr.utils.TypeRef; /** * Serializes and deserializes application's objects. diff --git a/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java b/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java index 467559669..9ca45a437 100644 --- a/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java @@ -13,11 +13,10 @@ package io.dapr.serializer; -import java.io.IOException; -import java.util.List; - import io.dapr.client.ObjectSerializer; import io.dapr.utils.TypeRef; +import java.io.IOException; +import java.util.List; /** From 4d10747560014de3efc2f2e6b17e426fc473e50d Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 8 Jul 2024 09:58:06 +0100 Subject: [PATCH 3/5] fixing test impl Signed-off-by: salaboy --- .../java/io/dapr/it/pubsub/http/PubSubIT.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index 347bd64de..28e5e31fa 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -150,6 +150,12 @@ public T deserialize(byte[] data, TypeRef type) throws IOException { public String getContentType() { return "application/json"; } + + @Override + public List deserializeList(byte[] data, TypeRef type) throws IOException { + //Not needed here + throw new UnsupportedOperationException("Not supported yet."); + } }; try (DaprClient client = new DaprClientBuilder().withObjectSerializer(serializer).build(); DaprPreviewClient previewClient = new DaprClientBuilder().withObjectSerializer(serializer).buildPreviewClient()) { @@ -279,6 +285,12 @@ public T deserialize(byte[] data, TypeRef type) throws IOException { public String getContentType() { return "application/json"; } + + @Override + public List deserializeList(byte[] data, TypeRef type) throws IOException { + //Not needed here + throw new UnsupportedOperationException("Not supported yet."); + } }; // Send a batch of messages on one topic @@ -488,11 +500,27 @@ public byte[] serialize(Object o) { public T deserialize(byte[] data, TypeRef type) { return (T) data; } +) { + @Override + public byte[] serialize(Object o) { + return (byte[])o; + } + + @Override + public T deserialize(byte[] data, TypeRef type) { + return (T) data; + } @Override public String getContentType() { return "application/octet-stream"; } + + @Override + public List deserializeList(byte[] data, TypeRef type) throws IOException { + //Not needed here + throw new UnsupportedOperationException("Not supported yet."); + } }; try (DaprClient client = new DaprClientBuilder().withObjectSerializer(serializer).build()) { client.publishEvent( From 2dbe549a771c26f89d0fd7cf50aa85098405e05d Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 8 Jul 2024 10:04:38 +0100 Subject: [PATCH 4/5] added implementations? Signed-off-by: salaboy --- .../src/test/java/io/dapr/it/pubsub/http/PubSubIT.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index 28e5e31fa..9f0783043 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -500,16 +500,6 @@ public byte[] serialize(Object o) { public T deserialize(byte[] data, TypeRef type) { return (T) data; } -) { - @Override - public byte[] serialize(Object o) { - return (byte[])o; - } - - @Override - public T deserialize(byte[] data, TypeRef type) { - return (T) data; - } @Override public String getContentType() { From 9b838d89853c7314d0f79590e9288ff9d7d730a9 Mon Sep 17 00:00:00 2001 From: salaboy Date: Tue, 9 Jul 2024 11:02:38 +0100 Subject: [PATCH 5/5] updating test with new serialization to follow postgresql format Signed-off-by: salaboy --- .../java/io/dapr/client/ObjectSerializer.java | 14 +++-- .../io/dapr/client/DaprClientGrpcTest.java | 59 ++++++++++++++++--- 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java index b8cb3e506..369f74148 100644 --- a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java @@ -14,6 +14,7 @@ package io.dapr.client; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonNode; @@ -23,7 +24,7 @@ import io.dapr.utils.TypeRef; import java.io.IOException; import java.lang.reflect.Method; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; /** @@ -151,10 +152,15 @@ private T deserialize(byte[] content, JavaType javaType) throws IOException * @return Object of type T. * @throws IOException In case content cannot be deserialized. */ - @SuppressWarnings("unchecked") public List deserializeList(byte[] content, Class clazz) throws IOException { - Class clazzArray = ((T[]) java.lang.reflect.Array.newInstance(clazz, 1)).getClass(); - return Arrays.asList(deserialize(content, OBJECT_MAPPER.constructType(clazzArray))); + List> deserialized = deserialize(content, + OBJECT_MAPPER.constructType(new TypeReference>>(){})); + List results = new ArrayList<>(deserialized.size()); + for (List l : deserialized) { + results.add(OBJECT_MAPPER.readValue(l.get(0), clazz)); + } + + return results; } /** diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index cb2e16b47..6325edb37 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -13,9 +13,11 @@ package io.dapr.client; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; + import io.dapr.client.domain.AppConnectionPropertiesHealthMetadata; import io.dapr.client.domain.AppConnectionPropertiesMetadata; import io.dapr.client.domain.ComponentMetadata; @@ -52,6 +54,7 @@ import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,6 +62,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatchers; import org.mockito.stubbing.Answer; + import reactor.core.publisher.Mono; import java.io.IOException; @@ -75,6 +79,7 @@ import java.util.stream.Collectors; import static io.dapr.utils.TestUtils.assertThrowsDaprException; + import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -90,6 +95,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.core.type.TypeReference; + public class DaprClientGrpcTest { private static final String STATE_STORE_NAME = "MyStateStore"; @@ -406,15 +413,45 @@ public void invokeBindingResponseObjectTypeRefTest() throws IOException { assertEquals("OK", result.block()); } + @Test - public void invokeBindingListResponseObjectTypeRefTest() throws IOException { - List list = new ArrayList<>(); - MyObject obj1 = new MyObject(1, "Object1"); - MyObject obj2 = new MyObject(2, "Objet2"); - list.add(obj1); - list.add(obj2); + public void invokeBindingResponseListObjectTypeRefTest() throws IOException { + //The output binding needs to be an array of arrays containing strings intead of JSON objects if we want to be able to parse it + ByteString listOfObjects = ByteString.copyFromUtf8("[[\"{ \\\"id\\\": 1,\\\"value\\\": \\\"Object1\\\"}\"],[\"{ \\\"id\\\": 2,\\\"value\\\": \\\"Object2\\\"}\"]]"); + DaprProtos.InvokeBindingResponse.Builder responseBuilder = + DaprProtos.InvokeBindingResponse.newBuilder().setData(listOfObjects); + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseBuilder.build()); + observer.onCompleted(); + return null; + }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); + + MyObject event = new MyObject(1, "Event"); + List> result = client.invokeBinding("BindingName", "MyOperation", event, new TypeRef>>(){}).block(); + ObjectMapper mapper = new ObjectMapper(); + List myObjects = new ArrayList<>(result.size()); + for(List l : result){ + myObjects.add(mapper.readValue(l.get(0), MyObject.class)); + } + + assertEquals(2, myObjects.size()); + assertEquals(1, myObjects.get(0).getId()); + assertEquals("Object1", myObjects.get(0).getValue()); + assertEquals(2, myObjects.get(1).getId()); + assertEquals("Object2", myObjects.get(1).getValue()); + + } + + @Test + public void invokeBindingListResponseWeirdArrayTypeRefTest() throws IOException { + + // The binding must return the values list only, the keys are not needed, as the Keys for Java are the field inside the object + // The content of the objects should be provided as a string with internal quotes + ByteString listOfObjects = ByteString.copyFromUtf8("[[\"{ \\\"id\\\": 1,\\\"value\\\": \\\"Object1\\\"}\"],[\"{ \\\"id\\\": 2,\\\"value\\\": \\\"Object2\\\"}\"]]"); + System.out.println(listOfObjects.toStringUtf8()); DaprProtos.InvokeBindingResponse.Builder responseBuilder = - DaprProtos.InvokeBindingResponse.newBuilder().setData(serialize(list)); + DaprProtos.InvokeBindingResponse.newBuilder().setData(listOfObjects); doAnswer((Answer) invocation -> { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onNext(responseBuilder.build()); @@ -422,9 +459,13 @@ public void invokeBindingListResponseObjectTypeRefTest() throws IOException { return null; }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); - Mono> result = client.invokeBindingList("BindingName", "MyOperation", null, null, TypeRef.get(MyObject.class)); + List myObjects = client.invokeBindingList("BindingName", "MyOperation", null, null, TypeRef.get(MyObject.class)).block(); - assertEquals(list, result.block()); + assertEquals(2, myObjects.size()); + assertEquals(1, myObjects.get(0).getId()); + assertEquals("Object1", myObjects.get(0).getValue()); + assertEquals(2, myObjects.get(1).getId()); + assertEquals("Object2", myObjects.get(1).getValue()); } @Test