Skip to content

Commit cc4967f

Browse files
HTTP-122 Retry for source lookup table (#148)
* retry for source lookup table * exposing retry metrics * logging optimization * fail on error
1 parent 0ee7808 commit cc4967f

33 files changed

+1369
-449
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ bin
1313
/src/test/test.iml
1414
/flink-http-connector.iml
1515
/dependency-reduced-pom.xml
16+
/.java-version

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## [Unreleased]
44

5+
- Retries support for source table:
6+
- Auto retry on IOException and user-defined http codes - parameter `gid.connector.http.source.lookup.retry-codes`.
7+
- Parameters `gid.connector.http.source.lookup.error.code.exclude"` and `gid.connector.http.source.lookup.error.code` were replaced by `gid.connector.http.source.lookup.ignored-response-codes`.
8+
- Added connection timeout for source table - `gid.connector.http.source.lookup.connection.timeout`.
9+
510
## [0.19.0] - 2025-03-20
611

712
- OIDC token request to not flow during explain

README.md

+108-36
Large diffs are not rendered by default.

dev/checkstyle.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
</module>
6666

6767
<module name="LineLength">
68-
<property name="max" value="100"/>
68+
<property name="max" value="120"/>
6969
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
7070
</module>
7171

pom.xml

+16-18
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,8 @@ under the License.
7575
<scala.binary.version>2.12</scala.binary.version>
7676
<maven.compiler.source>${target.java.version}</maven.compiler.source>
7777
<maven.compiler.target>${target.java.version}</maven.compiler.target>
78-
<log4j.version>2.17.2</log4j.version>
7978
<lombok.version>1.18.22</lombok.version>
8079
<jackson.version>2.18.1</jackson.version>
81-
<junit4.version>4.13.2</junit4.version>
8280
<junit5.version>5.10.1</junit5.version>
8381
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
8482
<assertj.core.version>3.21.0</assertj.core.version>
@@ -87,6 +85,8 @@ under the License.
8785
<jacoco.plugin.version>0.8.12</jacoco.plugin.version>
8886
<maven.shade.plugin.version>3.1.1</maven.shade.plugin.version>
8987
<mockito-inline.version>4.6.1</mockito-inline.version>
88+
<resilence4j.version>1.7.1</resilence4j.version>
89+
<slf4j.version>2.0.17</slf4j.version>
9090
</properties>
9191

9292
<repositories>
@@ -119,25 +119,17 @@ under the License.
119119
<scope>provided</scope>
120120
</dependency>
121121

122-
<!-- Add logging framework, to produce console output when running in the IDE. -->
123-
<!-- These dependencies are excluded from the application JAR by default. -->
124-
<dependency>
125-
<groupId>org.apache.logging.log4j</groupId>
126-
<artifactId>log4j-slf4j-impl</artifactId>
127-
<version>${log4j.version}</version>
128-
<scope>provided</scope>
129-
</dependency>
130122
<dependency>
131-
<groupId>org.apache.logging.log4j</groupId>
132-
<artifactId>log4j-api</artifactId>
133-
<version>${log4j.version}</version>
134-
<scope>provided</scope>
123+
<groupId>org.slf4j</groupId>
124+
<artifactId>slf4j-api</artifactId>
125+
<version>${slf4j.version}</version>
135126
</dependency>
127+
<!-- Add logging framework, to produce console output when running in the IDE. -->
136128
<dependency>
137-
<groupId>org.apache.logging.log4j</groupId>
138-
<artifactId>log4j-core</artifactId>
139-
<version>${log4j.version}</version>
140-
<scope>provided</scope>
129+
<groupId>org.slf4j</groupId>
130+
<artifactId>slf4j-simple</artifactId>
131+
<version>${slf4j.version}</version>
132+
<scope>test</scope>
141133
</dependency>
142134

143135
<dependency>
@@ -167,6 +159,12 @@ under the License.
167159
<scope>provided</scope>
168160
</dependency>
169161

162+
<dependency>
163+
<groupId>io.github.resilience4j</groupId>
164+
<artifactId>resilience4j-retry</artifactId>
165+
<version>${resilence4j.version}</version>
166+
</dependency>
167+
170168
<!--TEST-->
171169
<dependency>
172170
<groupId>org.apache.httpcomponents</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.getindata.connectors.http;
2+
3+
import java.net.http.HttpResponse;
4+
5+
import lombok.Getter;
6+
7+
@Getter
8+
public class HttpStatusCodeValidationFailedException extends Exception {
9+
private final HttpResponse<?> response;
10+
11+
public HttpStatusCodeValidationFailedException(String message, HttpResponse<?> response) {
12+
super(message);
13+
this.response = response;
14+
}
15+
}

src/main/java/com/getindata/connectors/http/internal/PollingClient.java

+7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.Collection;
44

55
import org.apache.flink.table.data.RowData;
6+
import org.apache.flink.table.functions.FunctionContext;
67

78
/**
89
* A client that is used to get enrichment data from external component.
@@ -15,4 +16,10 @@ public interface PollingClient<T> {
1516
* @return an optional result of data lookup.
1617
*/
1718
Collection<T> pull(RowData lookupRow);
19+
20+
/**
21+
* Initialize the client.
22+
* @param ctx function context
23+
*/
24+
void open(FunctionContext ctx);
1825
}

src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.Serializable;
44

55
import org.apache.flink.api.common.serialization.DeserializationSchema;
6+
import org.apache.flink.util.ConfigurationException;
67

78
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
89

@@ -11,5 +12,5 @@ public interface PollingClientFactory<OUT> extends Serializable {
1112
PollingClient<OUT> createPollClient(
1213
HttpLookupConfig options,
1314
DeserializationSchema<OUT> schemaDecoder
14-
);
15+
) throws ConfigurationException;
1516
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

+30-19
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ public final class HttpConnectorConfigConstants {
1818
* A property prefix for http connector.
1919
*/
2020
public static final String GID_CONNECTOR_HTTP = "gid.connector.http.";
21+
private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.";
2122

2223
/**
2324
* A property prefix for http connector header properties
2425
*/
2526
public static final String SINK_HEADER_PREFIX = GID_CONNECTOR_HTTP + "sink.header.";
2627

27-
public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
28-
+ "source.lookup.header.";
28+
public static final String LOOKUP_SOURCE_HEADER_PREFIX = SOURCE_LOOKUP_PREFIX + "header.";
2929

3030
public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
3131
+ "security.oidc.token.request";
@@ -40,33 +40,24 @@ public final class HttpConnectorConfigConstants {
4040
* the special treatment of the header for Basic Authentication, thus preserving the passed
4141
* raw value. Defaults to false.
4242
*/
43-
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = GID_CONNECTOR_HTTP
44-
+ "source.lookup.use-raw-authorization-header";
43+
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = SOURCE_LOOKUP_PREFIX + "use-raw-authorization-header";
4544

46-
public static final String RESULT_TYPE = GID_CONNECTOR_HTTP
47-
+ "source.lookup.result-type";
45+
public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type";
4846

4947
// --------- Error code handling configuration ---------
50-
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
51-
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
48+
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude";
5249

5350
public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
54-
55-
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST =
56-
GID_CONNECTOR_HTTP + "source.lookup.error.code.exclude";
57-
58-
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
59-
GID_CONNECTOR_HTTP + "source.lookup.error.code";
6051
// -----------------------------------------------------
6152

6253
public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
63-
GID_CONNECTOR_HTTP + "source.lookup.request-callback";
54+
SOURCE_LOOKUP_PREFIX + "request-callback";
6455

6556
public static final String SINK_REQUEST_CALLBACK_IDENTIFIER =
6657
GID_CONNECTOR_HTTP + "sink.request-callback";
6758

6859
public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER =
69-
GID_CONNECTOR_HTTP + "source.lookup.query-creator";
60+
SOURCE_LOOKUP_PREFIX + "query-creator";
7061

7162
// -------------- HTTPS security settings --------------
7263
public static final String ALLOW_SELF_SIGNED =
@@ -92,16 +83,19 @@ public final class HttpConnectorConfigConstants {
9283
// ------ HTTPS timeouts and thread pool settings ------
9384

9485
public static final String LOOKUP_HTTP_TIMEOUT_SECONDS =
95-
GID_CONNECTOR_HTTP + "source.lookup.request.timeout";
86+
SOURCE_LOOKUP_PREFIX + "request.timeout";
87+
88+
public static final String SOURCE_CONNECTION_TIMEOUT =
89+
SOURCE_LOOKUP_PREFIX + "connection.timeout";
9690

9791
public static final String SINK_HTTP_TIMEOUT_SECONDS =
9892
GID_CONNECTOR_HTTP + "sink.request.timeout";
9993

10094
public static final String LOOKUP_HTTP_PULING_THREAD_POOL_SIZE =
101-
GID_CONNECTOR_HTTP + "source.lookup.request.thread-pool.size";
95+
SOURCE_LOOKUP_PREFIX + "request.thread-pool.size";
10296

10397
public static final String LOOKUP_HTTP_RESPONSE_THREAD_POOL_SIZE =
104-
GID_CONNECTOR_HTTP + "source.lookup.response.thread-pool.size";
98+
SOURCE_LOOKUP_PREFIX + "response.thread-pool.size";
10599

106100
public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
107101
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";
@@ -117,4 +111,21 @@ public final class HttpConnectorConfigConstants {
117111
GID_CONNECTOR_HTTP + "sink.request.batch.size";
118112

119113
// ---------------------------------------------
114+
public static final String SOURCE_RETRY_SUCCESS_CODES = SOURCE_LOOKUP_PREFIX + "success-codes";
115+
public static final String SOURCE_RETRY_RETRY_CODES = SOURCE_LOOKUP_PREFIX + "retry-codes";
116+
public static final String SOURCE_IGNORE_RESPONSE_CODES = SOURCE_LOOKUP_PREFIX + "ignored-response-codes";
117+
118+
public static final String SOURCE_RETRY_STRATEGY_PREFIX = SOURCE_LOOKUP_PREFIX + "retry-strategy.";
119+
public static final String SOURCE_RETRY_STRATEGY_TYPE = SOURCE_RETRY_STRATEGY_PREFIX + "type";
120+
121+
private static final String SOURCE_RETRY_FIXED_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "fixed-delay.";
122+
public static final String SOURCE_RETRY_FIXED_DELAY_DELAY = SOURCE_RETRY_FIXED_DELAY_PREFIX + "delay";
123+
124+
private static final String SOURCE_RETRY_EXP_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "exponential-delay.";
125+
public static final String SOURCE_RETRY_EXP_DELAY_INITIAL_BACKOFF =
126+
SOURCE_RETRY_EXP_DELAY_PREFIX + "initial-backoff";
127+
public static final String SOURCE_RETRY_EXP_DELAY_MAX_BACKOFF =
128+
SOURCE_RETRY_EXP_DELAY_PREFIX + "max-backoff";
129+
public static final String SOURCE_RETRY_EXP_DELAY_MULTIPLIER =
130+
SOURCE_RETRY_EXP_DELAY_PREFIX + "backoff-multiplier";
120131
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.getindata.connectors.http.internal.retry;
2+
3+
import java.io.IOException;
4+
import java.net.http.HttpClient;
5+
import java.net.http.HttpRequest;
6+
import java.net.http.HttpResponse;
7+
import java.util.function.Supplier;
8+
9+
import io.github.resilience4j.retry.Retry;
10+
import io.github.resilience4j.retry.RetryConfig;
11+
import lombok.Builder;
12+
import lombok.Getter;
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.apache.flink.metrics.MetricGroup;
15+
16+
import com.getindata.connectors.http.HttpStatusCodeValidationFailedException;
17+
import com.getindata.connectors.http.internal.status.HttpResponseChecker;
18+
19+
@Slf4j
20+
public class HttpClientWithRetry {
21+
22+
private final HttpClient httpClient;
23+
@Getter
24+
private final HttpResponseChecker responseChecker;
25+
private final Retry retry;
26+
27+
@Builder
28+
HttpClientWithRetry(HttpClient httpClient,
29+
RetryConfig retryConfig,
30+
HttpResponseChecker responseChecker) {
31+
this.httpClient = httpClient;
32+
this.responseChecker = responseChecker;
33+
var adjustedRetryConfig = RetryConfig.from(retryConfig)
34+
.retryExceptions(IOException.class)
35+
.retryOnResult(this::isTemporalError)
36+
.build();
37+
this.retry = Retry.of("http-lookup-connector", adjustedRetryConfig);
38+
}
39+
40+
public void registerMetrics(MetricGroup metrics){
41+
var group = metrics.addGroup("http_lookup_connector");
42+
group.gauge("successfulCallsWithRetryAttempt",
43+
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithRetryAttempt());
44+
group.gauge("successfulCallsWithoutRetryAttempt",
45+
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithoutRetryAttempt());
46+
}
47+
48+
public <T> HttpResponse<T> send(
49+
Supplier<HttpRequest> requestSupplier,
50+
HttpResponse.BodyHandler<T> responseBodyHandler
51+
) throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
52+
try {
53+
var response = Retry.decorateCheckedSupplier(retry,
54+
() -> httpClient.send(requestSupplier.get(), responseBodyHandler)).apply();
55+
if (!responseChecker.isSuccessful(response)) {
56+
throw new HttpStatusCodeValidationFailedException(
57+
"Incorrect response code: " + response.statusCode(), response);
58+
}
59+
return response;
60+
} catch (IOException | InterruptedException | HttpStatusCodeValidationFailedException e) {
61+
throw e; //re-throw without wrapping
62+
} catch (Throwable t) {
63+
throw new RuntimeException("Unexpected exception", t);
64+
}
65+
}
66+
67+
private boolean isTemporalError(Object response) {
68+
return responseChecker.isTemporalError((HttpResponse<?>) response);
69+
}
70+
}
71+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.getindata.connectors.http.internal.retry;
2+
3+
import io.github.resilience4j.core.IntervalFunction;
4+
import io.github.resilience4j.retry.RetryConfig;
5+
import lombok.AccessLevel;
6+
import lombok.RequiredArgsConstructor;
7+
import org.apache.flink.configuration.ReadableConfig;
8+
import org.apache.flink.table.connector.source.lookup.LookupOptions;
9+
import static io.github.resilience4j.core.IntervalFunction.ofExponentialBackoff;
10+
11+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF;
12+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF;
13+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER;
14+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY;
15+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_STRATEGY;
16+
17+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
18+
public class RetryConfigProvider {
19+
20+
private final ReadableConfig config;
21+
22+
public static RetryConfig create(ReadableConfig config) {
23+
return new RetryConfigProvider(config).create();
24+
}
25+
26+
private RetryConfig create() {
27+
return createBuilder()
28+
.maxAttempts(config.get(LookupOptions.MAX_RETRIES) + 1)
29+
.build();
30+
}
31+
32+
private RetryConfig.Builder<?> createBuilder() {
33+
var retryStrategy = getRetryStrategy();
34+
if (retryStrategy == RetryStrategyType.FIXED_DELAY) {
35+
return configureFixedDelay();
36+
} else if (retryStrategy == RetryStrategyType.EXPONENTIAL_DELAY) {
37+
return configureExponentialDelay();
38+
}
39+
throw new IllegalArgumentException("Unsupported retry strategy: " + retryStrategy);
40+
}
41+
42+
private RetryStrategyType getRetryStrategy() {
43+
return RetryStrategyType.fromCode(config.get(SOURCE_LOOKUP_RETRY_STRATEGY));
44+
}
45+
46+
private RetryConfig.Builder<?> configureFixedDelay() {
47+
return RetryConfig.custom()
48+
.intervalFunction(IntervalFunction.of(config.get(SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY)));
49+
}
50+
51+
private RetryConfig.Builder<?> configureExponentialDelay() {
52+
var initialDelay = config.get(SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF);
53+
var maxDelay = config.get(SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF);
54+
var multiplier = config.get(SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER);
55+
return RetryConfig.custom()
56+
.intervalFunction(ofExponentialBackoff(initialDelay, multiplier, maxDelay));
57+
}
58+
}

0 commit comments

Comments
 (0)