diff --git a/CHANGELOG.md b/CHANGELOG.md index c7fbc729..63664a69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [Unreleased] +- Added support for generic json and URL query creator + - Retries support for source table: - Auto retry on IOException and user-defined http codes - parameter `gid.connector.http.source.lookup.retry-codes`. - Parameters `gid.connector.http.source.lookup.error.code.exclude"` and `gid.connector.http.source.lookup.error.code` were replaced by `gid.connector.http.source.lookup.ignored-response-codes`. @@ -13,6 +15,8 @@ ## [0.18.0] - 2025-01-15 +### Fixed + - Ignore Eclipse files in .gitignore - Support Flink 1.20 diff --git a/README.md b/README.md index 882047ee..eecd33e9 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,12 @@ with HTTP Lookup connector and SQL queries. To create a custom format user has to implement Flink's `SerializationSchema` and `SerializationFormatFactory` interfaces and register custom format factory along other factories in `resources/META-INF.services/org.apache.flink.table.factories.Factory` file. This is common Flink mechanism for providing custom implementations for various factories. +The most flexible query creator is the [GenericJsonAndUrlQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java) +which allows column content to be mapped as URL, path, body and query parameter request values; it supports +POST, PUT and GET operations. This query creator allows you to issue json requests without needing to code +your own custom http connector. The mappings from columns to the json request are supplied in the query creator configuration +parameters `gid.connector.http.request.query-param-fields`, `gid.connector.http.request.body-fields` and `gid.connector.http.request.url-map`. + In order to use custom format, user has to specify option `'lookup-request.format' = 'customFormatName'`, where `customFormatName` is the identifier of custom format factory. Additionally, it is possible to pass query format options from table's DDL. @@ -513,42 +519,45 @@ be requested if the current time is later than the cached token expiry time minu ## Table API Connector Options ### HTTP TableLookup Source -| Option | Required | Description/Value | -|--------------------------------------------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| connector | required | The Value should be set to _rest-lookup_ | -| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. | -| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ | -| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. | -| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. | -| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). | -| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | -| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | -| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | -| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | -| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following Lookup Cache section for more detail. Set value 0 to disable retries. | -| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. | -| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. | -| gid.connector.http.security.cert.server | optional | Comma separated paths to trusted HTTP server certificates that should be added to the connectors trust store. | -| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | -| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | -| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. | -| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding | -| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued | -| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. | -| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | -| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | -| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | -| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | -| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. | -| gid.connector.http.source.lookup.connection.timeout | optional | Source table connection timeout. Default - no value. | -| gid.connector.http.source.lookup.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. | -| gid.connector.http.source.lookup.retry-codes | optional | Comma separated http codes considered as transient errors. Use [1-5]XX for groups and '!' character for excluding. | -| gid.connector.http.source.lookup.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored responses togater with `gid.connector.http.source.lookup.success-codes` are considered as successful. | -| gid.connector.http.source.lookup.retry-strategy.type | optional | Auto retry strategy type: fixed-delay (default) or exponential-delay. | -| gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay | optional | Fixed-delay interval between retries. Default 1 second. Use with`lookup.max-retries` parameter. | -| gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff | optional | Exponential-delay initial delay. Default 1 second. | -| gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff | optional | Exponential-delay maximum delay. Default 1 minute. Use with `lookup.max-retries` parameter. | -| gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier | optional | Exponential-delay multiplier. Default value 1.5 | +| Option | Required | Description/Value | +|--------------------------------------------------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| connector | required | The Value should be set to _rest-lookup_ | +| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. | +| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ | +| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. | +| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. | +| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). | +| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following Lookup Cache section for more detail. Set value 0 to disable retries. | +| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. | +| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. | +| gid.connector.http.security.cert.server | optional | Comma separated paths to trusted HTTP server certificates that should be added to the connectors trust store. | +| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | +| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | +| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. | +| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding | +| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued | +| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. | +| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | +| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | +| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | +| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | +| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. | +| gid.connector.http.source.lookup.connection.timeout | optional | Source table connection timeout. Default - no value. | +| gid.connector.http.source.lookup.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. | +| gid.connector.http.source.lookup.retry-codes | optional | Comma separated http codes considered as transient errors. Use [1-5]XX for groups and '!' character for excluding. | +| gid.connector.http.source.lookup.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored responses togater with `gid.connector.http.source.lookup.success-codes` are considered as successful. | +| gid.connector.http.source.lookup.retry-strategy.type | optional | Auto retry strategy type: fixed-delay (default) or exponential-delay. | +| gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay | optional | Fixed-delay interval between retries. Default 1 second. Use with`lookup.max-retries` parameter. | +| gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff | optional | Exponential-delay initial delay. Default 1 second. | +| gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff | optional | Exponential-delay maximum delay. Default 1 minute. Use with `lookup.max-retries` parameter. | +| gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier | optional | Exponential-delay multiplier. Default value 1.5 | +| gid.connector.http.request.query-param-fields | optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The names of the fields that will be mapped to query parameters. The parameters are separated by semicolons, such as `param1;param2`. | +| gid.connector.http.request.body-fields | optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The names of the fields that will be mapped to the body. The parameters are separated by semicolons, such as `param1;param2`. | | +| gid.connector.http.request.url-map | optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The map of insert names to column names used as url segments. Parses a string as a map of strings. For example if there are table columns called `customerId` and `orderId`, then specifying value `customerId:cid1,orderID:oid` and a url of https://myendpoint/customers/{cid}/orders/{oid} will mean that the url used for the lookup query will dynamically pickup the values for `customerId`, `orderId` and use them in the url. The expected format of the map is: `key1:value1,key2:value2`. | ### HTTP Sink diff --git a/pom.xml b/pom.xml index aa0eeafd..8e32e4ef 100644 --- a/pom.xml +++ b/pom.xml @@ -146,6 +146,26 @@ under the License. provided + + org.apache.flink + flink-format-common + ${flink.version} + provided + + + + org.apache.flink + flink-json + ${flink.version} + provided + + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + com.fasterxml.jackson.core jackson-databind @@ -224,15 +244,6 @@ under the License. test - - - org.apache.flink - flink-json - ${flink.version} - test - - - org.apache.flink flink-runtime-web diff --git a/src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java b/src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java index 169edde2..bade9351 100644 --- a/src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java +++ b/src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java @@ -37,6 +37,9 @@ public interface LookupQueryCreatorFactory extends Factory, Serializable { /** + * @param readableConfig readable config + * @param lookupRow lookup row + * @param dynamicTableFactoryContext context * @return {@link LookupQueryCreator} custom lookup query creator instance */ LookupQueryCreator createLookupQueryCreator( diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RowDataSingleValueLookupSchemaEntry.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RowDataSingleValueLookupSchemaEntry.java index 31ed02ee..e977ca76 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RowDataSingleValueLookupSchemaEntry.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RowDataSingleValueLookupSchemaEntry.java @@ -47,7 +47,7 @@ public List convertToLookupArg(RowData lookupKeyRow) { } if (!(value instanceof BinaryStringData)) { - log.debug("Unsupported Key Type {}. Trying simple toString(), wish me luck...", + log.debug("Unsupported Key Type {}. Trying simple toString().", value.getClass()); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/TableSourceHelper.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/TableSourceHelper.java index 6b9cae7f..3996fed8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/TableSourceHelper.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/TableSourceHelper.java @@ -22,6 +22,8 @@ public final class TableSourceHelper { *

Note: This method returns an empty list for every {@link DataType} that is not a * composite * type. + * @param type logical type + * @return List of field names */ public static List getFieldNames(LogicalType type) { diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java new file mode 100644 index 00000000..f67a25a0 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java @@ -0,0 +1,265 @@ +/* + * © Copyright IBM Corp. 2025 + */ + +package com.getindata.connectors.http.internal.table.lookup.querycreators; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.*; + +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.types.Row; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.getindata.connectors.http.LookupArg; +import com.getindata.connectors.http.LookupQueryCreator; +import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo; +import com.getindata.connectors.http.internal.table.lookup.LookupRow; +import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils; + +/** + * Generic JSON and URL query creator; in addition to be able to map columns to json requests, + * it allows url inserts to be mapped to column names using templating. + *
+ * For GETs, column names are mapped to query parameters. e.g. for + * GenericJsonAndUrlQueryCreator.REQUEST_PARAM_FIELDS = "id1;id2" + * and url of http://base. At lookup time with values of id1=1 and id2=2 a call of + * http/base?id1=1&id2=2 will be issued. + *
+ * For PUT and POST, parameters are mapped to the json body e.g. for + * REQUEST_PARAM_FIELDS = "id1;id2" and url of http://base. At lookup time with values of id1=1 and + * id2=2 as call of http/base will be issued with a json payload of {"id1":1,"id2":2} + *
+ * For all http methods, url segments can be used to include lookup up values. Using the map from + * GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP which has a key of the insert and the + * value of the associated column. + * e.g. for GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP = "key1":"col1" + * and url of http://base/{key1}. At lookup time with values of col1="aaaa" a call of + * http/base/aaaa will be issued. + * + */ +@Slf4j +public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator { + private static final long serialVersionUID = 1L; + + // not final so we can mutate for unit test + private SerializationSchema serializationSchema; + private boolean schemaOpened = false; + private LookupRow lookupRow; + private final String httpMethod; + private final List requestQueryParamsFields; + private final List requestBodyFields; + private final Map requestUrlMap; + + /** + * Construct a Generic JSON and URL query creator. + * + * @param httpMethod the requested http method + * @param serializationSchema serialization schema for RowData + * @param requestQueryParamsFields query param fields + * @param requestBodyFields body fields used for PUT and POSTs + * @param requestUrlMap url map + * @param lookupRow lookup row itself. + */ + public GenericJsonAndUrlQueryCreator(final String httpMethod, + final SerializationSchema + serializationSchema, + final List requestQueryParamsFields, + final List requestBodyFields, + final Map requestUrlMap, + final LookupRow lookupRow) { + this.httpMethod = httpMethod; + this.serializationSchema = serializationSchema; + this.lookupRow = lookupRow; + this.requestQueryParamsFields = requestQueryParamsFields; + this.requestBodyFields = requestBodyFields; + this.requestUrlMap = requestUrlMap; + } + @VisibleForTesting + void setSerializationSchema(SerializationSchema + serializationSchema) { + this.serializationSchema = serializationSchema; + } + + @Override + public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) { + this.checkOpened(); + + final String lookupQuery; + Map bodyBasedUrlQueryParams = new HashMap<>(); + final Collection lookupArgs = + lookupRow.convertToLookupArgs(lookupDataRow); + ObjectNode jsonObject; + try { + jsonObject = (ObjectNode) ObjectMapperAdapter.instance().readTree( + serializationSchema.serialize(lookupDataRow)); + } catch (IOException e) { + String message = "Unable to parse the lookup arguments to json."; + log.error(message, e); + throw new RuntimeException(message, e); + } + // Parameters are encoded as query params for GET and none GET. + // Later code will turn these query params into the body for PUTs and POSTs + ObjectNode jsonObjectForQueryParams = ObjectMapperAdapter.instance().createObjectNode(); + for (String requestColumnName : this.requestQueryParamsFields) { + jsonObjectForQueryParams.set(requestColumnName, jsonObject.get(requestColumnName)); + } + // TODO can we convertToQueryParameters for all ops + // and not use/deprecate bodyBasedUrlQueryParams + if (httpMethod.equalsIgnoreCase("GET")) { + // add the query parameters + lookupQuery = convertToQueryParameters(jsonObjectForQueryParams, + StandardCharsets.UTF_8.toString()); + } else { + // Body-based queries + // serialize to a string for the body. + try { + lookupQuery = ObjectMapperAdapter.instance() + .writeValueAsString(jsonObject.retain(requestBodyFields)); + } catch (JsonProcessingException e) { + final String message = "Unable to convert Json Object to a string"; + throw new RuntimeException(message,e); + } + // body parameters + // use the request json object to scope the required fields and the lookupArgs as values + bodyBasedUrlQueryParams = createBodyBasedParams(lookupArgs, + jsonObjectForQueryParams); + } + // add the path map + final Map pathBasedUrlParams = createPathBasedParams(lookupArgs, + requestUrlMap); + + return new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParams, pathBasedUrlParams); + + } + + /** + * Create a Row from a RowData and DataType + * @param lookupRowData the lookup RowData + * @param rowType the datatype + * @return row return row + */ + @VisibleForTesting + static Row rowDataToRow(final RowData lookupRowData, final DataType rowType) { + Preconditions.checkNotNull(lookupRowData); + Preconditions.checkNotNull(rowType); + + final Row row = Row.withNames(); + final List rowFields = FieldsDataType.getFields(rowType); + + for (int idx = 0; idx < rowFields.size(); idx++) { + final String fieldName = rowFields.get(idx).getName(); + final Object fieldValue = ((GenericRowData) lookupRowData).getField(idx); + row.setField(fieldName, fieldValue); + } + return row; + } + + /** + * Create map of the json key to the lookup argument + * value. This is used for body based content. + * @param args lookup arguments + * @param objectNode object node + * @return map of field content to the lookup argument value. + */ + private Map createBodyBasedParams(final Collection args, + ObjectNode objectNode ) { + Map mapOfJsonKeyToLookupArg = new LinkedHashMap<>(); + Iterator> iterator = objectNode.fields(); + iterator.forEachRemaining(field -> { + for (final LookupArg arg : args) { + if (arg.getArgName().equals(field.getKey())) { + String keyForMap = field.getKey(); + mapOfJsonKeyToLookupArg.put( + keyForMap, arg.getArgValue()); + } + } + }); + + return mapOfJsonKeyToLookupArg; + } + /** + * Create map of the json key to the lookup argument + * value. This is used for path based content. + * @param args lookup arguments + * @param urlMap map of insert name to column name + * @return map of field content to the lookup argument value. + */ + private Map createPathBasedParams(final Collection args, + Map urlMap ) { + Map mapOfJsonKeyToLookupArg = new LinkedHashMap<>(); + if (urlMap != null) { + for (String key: urlMap.keySet()) { + for (final LookupArg arg : args) { + if (arg.getArgName().equals(key)) { + mapOfJsonKeyToLookupArg.put( + urlMap.get(key), arg.getArgValue()); + } + } + } + } + return mapOfJsonKeyToLookupArg; + } + /** + * Convert json object to query params string + * @param jsonObject supplies json object + * @param enc encoding string - used in unit test to drive unsupported encoding + * @return query params string + */ + @VisibleForTesting + static String convertToQueryParameters(final ObjectNode jsonObject, String enc) { + Preconditions.checkNotNull(jsonObject); + + final StringJoiner result = new StringJoiner("&"); + jsonObject.fields().forEachRemaining(field -> { + final String fieldName = field.getKey(); + final String fieldValue = field.getValue().asText(); + + try { + result.add(fieldName + "=" + + URLEncoder.encode(fieldValue, enc)); + } catch (UnsupportedEncodingException e) { + final String message = + "Failed to encode the value of the query parameter name " + + fieldName + + ": " + + fieldValue; + throw new RuntimeException(message, e); + } + }); + + return result.toString(); + } + + private void checkOpened() { + if (!this.schemaOpened) { + try { + this.serializationSchema.open( + SerializationSchemaUtils + .createSerializationInitContext( + GenericJsonAndUrlQueryCreator.class)); + this.schemaOpened = true; + } catch (final Exception e) { + final String message = + "Failed to initialize serialization schema for " + + GenericJsonAndUrlQueryCreator.class; + log.error(message, e); + throw new FlinkRuntimeException(message, e); + } + } + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java new file mode 100644 index 00000000..1ed9dab1 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java @@ -0,0 +1,142 @@ +/* + * © Copyright IBM Corp. 2025 + */ + +package com.getindata.connectors.http.internal.table.lookup.querycreators; + +import java.util.*; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import static org.apache.flink.configuration.ConfigOptions.key; + +import com.getindata.connectors.http.LookupQueryCreator; +import com.getindata.connectors.http.LookupQueryCreatorFactory; +import com.getindata.connectors.http.internal.table.lookup.LookupRow; +import com.getindata.connectors.http.internal.utils.SynchronizedSerializationSchema; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.ASYNC_POLLING; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_METHOD; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_REQUEST_FORMAT; + +/** + * Generic JSON and url query creator factory defined configuration to define the columns to be + *

    + *
  1. List of column names to be included in the query params
  2. + *
  3. List of column names to be included in the body (for PUT and POST)
  4. + *
  5. Map of templated uri segment names to column names
  6. + *
+ */ +@SuppressWarnings({"checkstyle:RegexpSingleline", "checkstyle:LineLength"}) +public class GenericJsonAndUrlQueryCreatorFactory implements LookupQueryCreatorFactory { + private static final long serialVersionUID = 1L; + + public static final String ID = "generic-json-url"; + + public static final ConfigOption> REQUEST_QUERY_PARAM_FIELDS = + key("gid.connector.http.request.query-param-fields") + .stringType() + .asList() + .defaultValues() //default to empty list so we do not need to check for null + .withDescription( + "The names of the fields that will be mapped to query parameters." + + " The parameters are separated by semicolons," + + " such as 'param1;param2'."); + public static final ConfigOption> REQUEST_BODY_FIELDS = + key("gid.connector.http.request.body-fields") + .stringType() + .asList() + .defaultValues() //default to empty list so we do not need to check for null + .withDescription( + "The names of the fields that will be mapped to the body." + + " The parameters are separated by semicolons," + + " such as 'param1;param2'."); + public static final ConfigOption> REQUEST_URL_MAP = + ConfigOptions.key("gid.connector.http.request.url-map") + .mapType() + .noDefaultValue() + .withDescription("The map of insert names to column names used" + + "as url segments. Parses a string as a map of strings. " + + "
" + + "For example if there are table columns called customerId" + + " and orderId, then specifying value customerId:cid1,orderID:oid" + + " and a url of https://myendpoint/customers/{cid}/orders/{oid}" + + " will mean that the url used for the lookup query will" + + " dynamically pickup the values for customerId, orderId" + + " and use them in the url." + + "
Notes
" + + "The expected format of the map is:" + + "
" + + " key1:value1,key2:value2" + ); + + @Override + public LookupQueryCreator createLookupQueryCreator(final ReadableConfig readableConfig, + final LookupRow lookupRow, + final DynamicTableFactory.Context + dynamicTableFactoryContext) { + final String httpMethod = readableConfig.get(LOOKUP_METHOD); + final String formatIdentifier = readableConfig.get(LOOKUP_REQUEST_FORMAT); + // get the information from config + final List requestQueryParamsFields = + readableConfig.get(REQUEST_QUERY_PARAM_FIELDS); + final List requestBodyFields = + readableConfig.get(REQUEST_BODY_FIELDS); + Map requestUrlMap = readableConfig.get(REQUEST_URL_MAP); + + final SerializationFormatFactory jsonFormatFactory = + FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), + SerializationFormatFactory.class, formatIdentifier); + QueryFormatAwareConfiguration queryFormatAwareConfiguration = + new QueryFormatAwareConfiguration( + LOOKUP_REQUEST_FORMAT.key() + "." + formatIdentifier, + (Configuration) readableConfig); + EncodingFormat> + encoder = jsonFormatFactory.createEncodingFormat( + dynamicTableFactoryContext, + queryFormatAwareConfiguration + ); + + final SerializationSchema jsonSerializationSchema; + if (readableConfig.get(ASYNC_POLLING)) { + jsonSerializationSchema = new SynchronizedSerializationSchema<>( + encoder.createRuntimeEncoder(null, + lookupRow.getLookupPhysicalRowDataType())); + } else { + jsonSerializationSchema = + encoder.createRuntimeEncoder(null, + lookupRow.getLookupPhysicalRowDataType()); + } + // create using config parameter values and specify serialization + // schema from json format. + return new GenericJsonAndUrlQueryCreator(httpMethod, + jsonSerializationSchema, + requestQueryParamsFields, + requestBodyFields, + requestUrlMap, + lookupRow); + } + + @Override + public String factoryIdentifier() { + return ID; + } + + @Override + public Set> requiredOptions() { + return Set.of(); + } + @Override + public Set> optionalOptions() { + return Set.of(REQUEST_QUERY_PARAM_FIELDS, + REQUEST_BODY_FIELDS, + REQUEST_URL_MAP); + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/ObjectMapperAdapter.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/ObjectMapperAdapter.java new file mode 100644 index 00000000..d8d8aa34 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/ObjectMapperAdapter.java @@ -0,0 +1,28 @@ +package com.getindata.connectors.http.internal.table.lookup.querycreators; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.json.JsonMapper; + +/** + * Centralizes the use of {@link ObjectMapper}. + */ +public class ObjectMapperAdapter { + private static final ObjectMapper MAPPER = initialize(); + + private static ObjectMapper initialize() { + final ObjectMapper mapper = JsonMapper.builder() + .configure(MapperFeature.USE_STD_BEAN_NAMING, + false).build(); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + mapper.disable(SerializationFeature.WRITE_DATES_WITH_ZONE_ID); + mapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE); + return mapper; + } + + public static ObjectMapper instance() { + return MAPPER; + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/utils/uri/URIBuilder.java b/src/main/java/com/getindata/connectors/http/internal/utils/uri/URIBuilder.java index 7f0b29e7..287e3322 100644 --- a/src/main/java/com/getindata/connectors/http/internal/utils/uri/URIBuilder.java +++ b/src/main/java/com/getindata/connectors/http/internal/utils/uri/URIBuilder.java @@ -80,7 +80,8 @@ public URIBuilder(URI uri) { /** * Construct an instance from the provided URI. * - * @param uri + * @param uri supplied uri + * @param charset character set */ public URIBuilder(final URI uri, final Charset charset) { super(); @@ -95,6 +96,9 @@ public URIBuilder(final URI uri, final Charset charset) { * Please note query parameters and custom query component are mutually exclusive. This method * will remove custom query if present. *

+ * @param param parameter to add + * @param value value to add + * @return the URI builder */ public URIBuilder addParameter(final String param, final String value) { if (this.queryParams == null) { @@ -108,6 +112,8 @@ public URIBuilder addParameter(final String param, final String value) { /** * Builds a {@link URI} instance. + * @return URI + * @throws URISyntaxException URI syntax Exception */ public URI build() throws URISyntaxException { return new URI(buildString()); diff --git a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 47fb2c71..943d0b8c 100644 --- a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -4,4 +4,5 @@ com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQuer com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreatorFactory com.getindata.connectors.http.internal.table.lookup.Slf4jHttpLookupPostRequestCallbackFactory com.getindata.connectors.http.internal.table.sink.HttpDynamicTableSinkFactory -com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallbackFactory \ No newline at end of file +com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallbackFactory +com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonAndUrlQueryCreatorFactory \ No newline at end of file diff --git a/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java b/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java index 28cba6be..c597aa56 100644 --- a/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java +++ b/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java @@ -11,9 +11,9 @@ public abstract class HttpsConnectionTestBase { - protected static final int SERVER_PORT = 9090; + public static final int SERVER_PORT = 9090; - protected static final int HTTPS_SERVER_PORT = 8443; + public static final int HTTPS_SERVER_PORT = 8443; protected static final String ENDPOINT = "/myendpoint"; diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/CustomFormatFactory.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/CustomFormatFactory.java index 2af9f155..fb4b8443 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/CustomFormatFactory.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/CustomFormatFactory.java @@ -18,6 +18,10 @@ public class CustomFormatFactory implements SerializationFormatFactory { public static final String IDENTIFIER = "query-creator-test-format"; public static final String REQUIRED_OPTION = "required-option-one"; + + /** + * TODO remove static - used for testing only + */ static boolean requiredOptionsWereUsed = false; @Override diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/CustomJsonFormatFactory.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/CustomJsonFormatFactory.java new file mode 100644 index 00000000..9fdde425 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/CustomJsonFormatFactory.java @@ -0,0 +1,51 @@ +package com.getindata.connectors.http.internal.table.lookup.querycreators; + +import java.util.Collections; +import java.util.Set; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.json.JsonFormatFactory; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory.Context; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; + +public class CustomJsonFormatFactory extends JsonFormatFactory + implements SerializationFormatFactory { + + public static final String IDENTIFIER = "json-query-creator-test-format"; + public static final String REQUIRED_OPTION = "required-option-one"; + + /** + * Consider removing this static only used for testing only + */ + static boolean requiredOptionsWereUsed = false; + + @Override + public EncodingFormat> createEncodingFormat( + Context context, + ReadableConfig readableConfig) { + FactoryUtil.validateFactoryOptions(this, readableConfig); + return super.createEncodingFormat(context, readableConfig); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + requiredOptionsWereUsed = true; + return Set.of(ConfigOptions.key(REQUIRED_OPTION).stringType().noDefaultValue()); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java new file mode 100644 index 00000000..83fa3826 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java @@ -0,0 +1,117 @@ +/* + * © Copyright IBM Corp. 2025 + */ +package com.getindata.connectors.http.internal.table.lookup.querycreators; + +import java.util.List; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.*; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.getindata.connectors.http.LookupQueryCreator; +import com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions; +import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo; +import com.getindata.connectors.http.internal.table.lookup.LookupRow; +import com.getindata.connectors.http.internal.table.lookup.RowDataSingleValueLookupSchemaEntry; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row; +import static com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonAndUrlQueryCreatorFactory.*; +import static com.getindata.connectors.http.internal.table.lookup.querycreators.QueryCreatorUtils.getTableContext; + +class GenericJsonAndUrlQueryCreatorFactoryTest +{ + private Configuration config = new Configuration(); + + private DynamicTableFactory.Context tableContext; + + @BeforeEach + public void setUp() { + CustomJsonFormatFactory.requiredOptionsWereUsed = false; + this.tableContext = getTableContext(this.config, ResolvedSchema.of(Column.physical("key1", + DataTypes.STRING()))); + } + + @Test + public void lookupQueryInfoTestStr() { + assertThat(CustomJsonFormatFactory.requiredOptionsWereUsed) + .withFailMessage( + "CustomJsonFormat was not cleared, " + + "make sure `CustomJsonFormatFactory.requiredOptionsWereUsed" + + "= false` " + + "was called before this test execution.") + .isFalse(); + + this.config.setString("lookup-request.format", CustomJsonFormatFactory.IDENTIFIER); + this.config.setString( + String.format("lookup-request.format.%s.%s", CustomJsonFormatFactory.IDENTIFIER, + CustomJsonFormatFactory.REQUIRED_OPTION), "optionValue"); + this.config.set(REQUEST_QUERY_PARAM_FIELDS, List.of("key1")); + // with sync + createUsingFactory(false); + // with async + createUsingFactory(true); + } + + @Test + public void lookupQueryInfoTestRequiredConfig() { + GenericJsonAndUrlQueryCreatorFactory genericJsonAndUrlQueryCreatorFactory = + new GenericJsonAndUrlQueryCreatorFactory(); + assertThrows(RuntimeException.class, () -> { + genericJsonAndUrlQueryCreatorFactory.createLookupQueryCreator(config, + null, + null); + }); + // do not specify REQUEST_ARG_PATHS_CONFIG + assertThrows(RuntimeException.class, () -> { + genericJsonAndUrlQueryCreatorFactory.createLookupQueryCreator(config, + null, + null); + }); + } + + private void createUsingFactory(boolean async) { + this.config.setBoolean(HttpLookupConnectorOptions.ASYNC_POLLING, async); + LookupRow lookupRow= new LookupRow() + .addLookupEntry( + new RowDataSingleValueLookupSchemaEntry( + "key1", + RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0) + )); + + lookupRow.setLookupPhysicalRowDataType( + row(List.of( + DataTypes.FIELD("key1", DataTypes.STRING()) + ))); + LookupQueryCreator lookupQueryCreator = new + GenericJsonAndUrlQueryCreatorFactory().createLookupQueryCreator( + config, + lookupRow, + tableContext + ); + GenericRowData lookupRowData = GenericRowData.of( + StringData.fromString("val1") + ); + + LookupQueryInfo lookupQueryInfo = lookupQueryCreator.createLookupQuery(lookupRowData); + assertThat(CustomJsonFormatFactory.requiredOptionsWereUsed).isTrue(); + assertThat(lookupQueryInfo.hasLookupQuery()).isTrue(); + assertThat(lookupQueryInfo.hasBodyBasedUrlQueryParameters()).isFalse(); + assertThat(lookupQueryInfo.hasPathBasedUrlParameters()).isFalse(); + } + @Test + void optionsTests() { + GenericJsonAndUrlQueryCreatorFactory factory = new GenericJsonAndUrlQueryCreatorFactory(); + assertThat(factory.requiredOptions()).isEmpty(); + assertThat(factory.optionalOptions()).contains(REQUEST_QUERY_PARAM_FIELDS); + assertThat(factory.optionalOptions()).contains(REQUEST_BODY_FIELDS); + assertThat(factory.optionalOptions()).contains(REQUEST_URL_MAP); + } +} diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java new file mode 100644 index 00000000..ae3b85ee --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java @@ -0,0 +1,258 @@ +/* + * © Copyright IBM Corp. 2025 + */ +package com.getindata.connectors.http.internal.table.lookup.querycreators; + +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo; +import com.getindata.connectors.http.internal.table.lookup.LookupRow; +import com.getindata.connectors.http.internal.table.lookup.RowDataSingleValueLookupSchemaEntry; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_METHOD; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row; +import static com.getindata.connectors.http.internal.table.lookup.querycreators.QueryCreatorUtils.getTableContext; + +class GenericJsonAndUrlQueryCreatorTest { + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; + private static final String KEY_3 = "key3"; + private static final String VALUE = "val1"; + // for GET this is the minimum config + private static final List QUERY_PARAMS = List.of(KEY_1); + // Path param ArgPath required a stringified json object. As we have PersonBean + // we can use that. + private static final Map URL_PARAMS = Map.of(KEY_1, KEY_1); + private static final DataType DATATYPE_1 = row(List.of( + DataTypes.FIELD(KEY_1, DataTypes.STRING()) + )); + private static final DataType DATATYPE_1_2 = row(List.of( + DataTypes.FIELD(KEY_1, DataTypes.STRING()), + DataTypes.FIELD(KEY_2, DataTypes.STRING()) + )); + private static final ResolvedSchema RESOLVED_SCHEMA = ResolvedSchema.of(Column.physical(KEY_1, + DataTypes.STRING())); + private static final RowData ROWDATA = getRowData(1, VALUE); + + @ParameterizedTest + @ValueSource(strings = {"GET", "PUT", "POST" }) + public void createLookupQueryTestStrAllOps(String operation) { + //GIVEN + LookupRow lookupRow = getLookupRow(KEY_1); + Configuration config = getConfiguration(operation); + GenericJsonAndUrlQueryCreator universalJsonQueryCreator = + (GenericJsonAndUrlQueryCreator) new GenericJsonAndUrlQueryCreatorFactory() + .createLookupQueryCreator( + config, + lookupRow, + getTableContext(config, + RESOLVED_SCHEMA) + ); + // WHEN + var createdQuery = universalJsonQueryCreator.createLookupQuery(ROWDATA); + // THEN + if (operation.equals("GET")) { + validateCreatedQueryForGet(createdQuery); + } else { + validateCreatedQueryForPutAndPost(createdQuery); + } + // validate url based parameters + assertThat(createdQuery.getPathBasedUrlParameters().size() == 1).isTrue(); + assertThat(createdQuery.getPathBasedUrlParameters().get(KEY_1)).isEqualTo(VALUE); + } + + @Test + public void createLookupQueryTest() { + // GIVEN + List query_params = List.of(KEY_1, KEY_2); + final String URL_INSERT = "AAA"; + Map url_params = Map.of(KEY_1, URL_INSERT); + LookupRow lookupRow = getLookupRow(KEY_1, KEY_2); + ResolvedSchema resolvedSchema = ResolvedSchema.of( + Column.physical(KEY_1, DataTypes.STRING()), + Column.physical(KEY_2, DataTypes.STRING())); + Configuration config = getConfiguration("GET"); + config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS, query_params); + config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, url_params); + lookupRow.setLookupPhysicalRowDataType(DATATYPE_1_2); + GenericJsonAndUrlQueryCreator genericJsonAndUrlQueryCreator = + (GenericJsonAndUrlQueryCreator) new GenericJsonAndUrlQueryCreatorFactory() + .createLookupQueryCreator( + config, + lookupRow, + getTableContext(config, + resolvedSchema) + ); + var row = getRowData(2, VALUE); + row.setField(1, StringData.fromString(VALUE)); + // WHEN + var createdQuery = genericJsonAndUrlQueryCreator.createLookupQuery(row); + // THEN + assertThat(createdQuery.getPathBasedUrlParameters().get(URL_INSERT)).isEqualTo(VALUE); + assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEmpty(); + assertThat(createdQuery.getLookupQuery()).isEqualTo(KEY_1 + "=" + VALUE + + "&" + KEY_2 + "=" + VALUE); + } + + @Test + public void failSerializationOpenTest() { + // GIVEN + LookupRow lookupRow = getLookupRow(KEY_1); + ResolvedSchema resolvedSchema = ResolvedSchema.of(Column.physical(KEY_1, + DataTypes.STRING())); + Configuration config = getConfiguration("GET"); + lookupRow.setLookupPhysicalRowDataType(DATATYPE_1); + GenericJsonAndUrlQueryCreator genericJsonAndUrlQueryCreator = + (GenericJsonAndUrlQueryCreator) new GenericJsonAndUrlQueryCreatorFactory() + .createLookupQueryCreator( + config, + lookupRow, + getTableContext(config, + resolvedSchema) + ); + // create a SerializationSchema that throws and exception in open + SerializationSchema mockSerialiationSchema = new SerializationSchema() { + @Override + public void open(InitializationContext context) throws Exception { + throw new Exception("Exception for testing"); + } + @Override + public byte[] serialize(RowData element) { + return new byte[0]; + } + }; + // WHEN + genericJsonAndUrlQueryCreator.setSerializationSchema(mockSerialiationSchema); + var row = new GenericRowData(1); + // THEN + assertThrows(RuntimeException.class, () -> { + genericJsonAndUrlQueryCreator.createLookupQuery(row); + }); + } + @Test + void convertToQueryParametersUnsupportedEncodingTest() { + // GIVEN + ObjectMapper mapper = ObjectMapperAdapter.instance(); + PersonBean person = new PersonBean("aaa", "bbb"); + // WHEN + JsonNode personNode = mapper.valueToTree(person); + // THEN + assertThrows(RuntimeException.class, () -> { + GenericJsonAndUrlQueryCreator.convertToQueryParameters( + (ObjectNode) personNode, "bad encoding"); + }); + } + @Test + void rowDataToRowTest() { + // GIVEN + // String + final String value = VALUE; + int intValue = 10; + GenericRowData rowData = GenericRowData.of( + StringData.fromString(value), + intValue, + intValue + ); + DataType dataType = row(List.of( + DataTypes.FIELD(KEY_1, DataTypes.STRING()), + DataTypes.FIELD(KEY_2, DataTypes.DATE()), + DataTypes.FIELD(KEY_3, DataTypes.TIMESTAMP_LTZ()) + )); + // WHEN + Row row = rowDataToRow(rowData, dataType); + // THEN + assertThat(row.getField(KEY_1).equals(value)); + assertThat(row.getField(KEY_2).equals("1970-01-01T00:00:00.010")); + assertThat(row.getField(KEY_3).equals("1970-01-01T00:00:00.010Z")); + } + + private static void validateCreatedQueryForGet( LookupQueryInfo createdQuery) { + // check there is no body params and we have the expected lookup query + assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEmpty(); + assertThat(createdQuery.getLookupQuery()).isEqualTo(KEY_1 + "=" + VALUE); + } + + private static void validateCreatedQueryForPutAndPost(LookupQueryInfo createdQuery) { + // check we have the expected body params and lookup query + assertThat(createdQuery + .getBodyBasedUrlQueryParameters()) + .isEqualTo(KEY_1 + "=" + VALUE); + assertThat(createdQuery.getLookupQuery()).isEqualTo( + "{\"" + + KEY_1 + + "\":\"" + VALUE + + "\"}"); + } + + private static @NotNull GenericRowData getRowData(int arity, String value) { + var row = new GenericRowData(arity); + row.setField(0, StringData.fromString(value)); + return row; + } + + private static @NotNull Configuration getConfiguration(String operation) { + Configuration config = new Configuration(); + config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS, + QUERY_PARAMS); + if (!operation.equals("GET")) { + // add the body content for PUT and POST + config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS, + QUERY_PARAMS); + } + config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, URL_PARAMS); + config.setString(LOOKUP_METHOD, operation); + return config; + } + + private static @NotNull LookupRow getLookupRow(String... keys ) { + + LookupRow lookupRow = new LookupRow(); + for (int keyNumber = 0; keyNumber < keys.length; keyNumber++) { + lookupRow.addLookupEntry( + new RowDataSingleValueLookupSchemaEntry( + keys[keyNumber], + RowData.createFieldGetter( + DataTypes.STRING().getLogicalType(), keyNumber) + )); + lookupRow.setLookupPhysicalRowDataType(DATATYPE_1); + } + return lookupRow; + } + + private static Row rowDataToRow(final RowData lookupRowData, final DataType rowType) { + Preconditions.checkNotNull(lookupRowData); + Preconditions.checkNotNull(rowType); + + final Row row = Row.withNames(); + final List rowFields = FieldsDataType.getFields(rowType); + + for (int idx = 0; idx < rowFields.size(); idx++) { + final String fieldName = rowFields.get(idx).getName(); + final Object fieldValue = ((GenericRowData) lookupRowData).getField(idx); + row.setField(fieldName, fieldValue); + } + return row; + } +} diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/PathBean.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/PathBean.java new file mode 100644 index 00000000..40103ae2 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/PathBean.java @@ -0,0 +1,11 @@ +/* + * © Copyright IBM Corp. 2025 + */ +package com.getindata.connectors.http.internal.table.lookup.querycreators; + +import lombok.Data; + +@Data +public class PathBean { + private String key1; +} diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/PersonBean.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/PersonBean.java new file mode 100644 index 00000000..5ca4a0e0 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/PersonBean.java @@ -0,0 +1,12 @@ +/* + * © Copyright IBM Corp. 2025 + */ +package com.getindata.connectors.http.internal.table.lookup.querycreators; + +import lombok.Data; + +@Data +public class PersonBean { + private final String firstName; + private final String lastName; +} diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/QueryCreatorUtils.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/QueryCreatorUtils.java new file mode 100644 index 00000000..6ccdb444 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/QueryCreatorUtils.java @@ -0,0 +1,60 @@ +package com.getindata.connectors.http.internal.table.lookup.querycreators; + +import java.util.Collections; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; + +import com.getindata.connectors.http.internal.table.lookup.LookupRow; + +public class QueryCreatorUtils { + public static DynamicTableFactory.Context getTableContext(Configuration config, + ResolvedSchema resolvedSchema) { + + return new FactoryUtil.DefaultDynamicTableContext( + ObjectIdentifier.of("default", "default", "test"), + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), + null, + Collections.emptyList(), + Collections.emptyMap()), + resolvedSchema), + Collections.emptyMap(), + config, + Thread.currentThread().getContextClassLoader(), + false + ); + } + public static SerializationSchema getRowDataSerializationSchema(LookupRow lookupRow, + DynamicTableFactory.Context dynamicTableFactoryContext, + String formatIdentifier, + QueryFormatAwareConfiguration queryFormatAwareConfiguration) { + SerializationFormatFactory jsonFormatFactory = + FactoryUtil.discoverFactory( + dynamicTableFactoryContext.getClassLoader(), + SerializationFormatFactory.class, + formatIdentifier + ); + + EncodingFormat> + encoder = jsonFormatFactory.createEncodingFormat( + dynamicTableFactoryContext, + queryFormatAwareConfiguration + ); + + final SerializationSchema serializationSchema = + encoder.createRuntimeEncoder(null, lookupRow.getLookupPhysicalRowDataType()); + return serializationSchema; + } +} diff --git a/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index df4f5338..065a168b 100644 --- a/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -1,3 +1,4 @@ com.getindata.connectors.http.TestPostRequestCallbackFactory com.getindata.connectors.http.TestLookupPostRequestCallbackFactory -com.getindata.connectors.http.internal.table.lookup.querycreators.CustomFormatFactory \ No newline at end of file +com.getindata.connectors.http.internal.table.lookup.querycreators.CustomFormatFactory +com.getindata.connectors.http.internal.table.lookup.querycreators.CustomJsonFormatFactory \ No newline at end of file