|  | 
|  | 1 | +package cloud.stackit.sdk.core.wait; | 
|  | 2 | + | 
|  | 3 | +import cloud.stackit.sdk.core.exception.ApiException; | 
|  | 4 | +import cloud.stackit.sdk.core.exception.GenericOpenAPIException; | 
|  | 5 | + | 
|  | 6 | +import java.net.HttpURLConnection; | 
|  | 7 | +import java.util.Arrays; | 
|  | 8 | +import java.util.HashSet; | 
|  | 9 | +import java.util.Set; | 
|  | 10 | +import java.util.concurrent.CompletableFuture; | 
|  | 11 | +import java.util.concurrent.Executors; | 
|  | 12 | +import java.util.concurrent.ScheduledExecutorService; | 
|  | 13 | +import java.util.concurrent.ScheduledFuture; | 
|  | 14 | +import java.util.concurrent.TimeUnit; | 
|  | 15 | +import java.util.concurrent.TimeoutException; | 
|  | 16 | +import java.util.concurrent.atomic.AtomicInteger; | 
|  | 17 | + | 
|  | 18 | +public class AsyncActionHandler<T> { | 
|  | 19 | +	public static final Set<Integer> RETRY_HTTP_ERROR_STATUS_CODES = | 
|  | 20 | +			new HashSet<>( | 
|  | 21 | +					Arrays.asList( | 
|  | 22 | +							HttpURLConnection.HTTP_BAD_GATEWAY, | 
|  | 23 | +							HttpURLConnection.HTTP_GATEWAY_TIMEOUT)); | 
|  | 24 | + | 
|  | 25 | +	public static final String TEMPORARY_ERROR_MESSAGE = | 
|  | 26 | +			"Temporary error was found and the retry limit was reached."; | 
|  | 27 | +	public static final String TIMEOUT_ERROR_MESSAGE = "WaitWithContextAsync() has timed out."; | 
|  | 28 | +	public static final String NON_GENERIC_API_ERROR_MESSAGE = "Found non-GenericOpenAPIError."; | 
|  | 29 | + | 
|  | 30 | +	private final CheckFunction<AsyncActionResult<T>> checkFn; | 
|  | 31 | + | 
|  | 32 | +	private long sleepBeforeWaitMillis; | 
|  | 33 | +	private long throttleMillis; | 
|  | 34 | +	private long timeoutMillis; | 
|  | 35 | +	private int tempErrRetryLimit; | 
|  | 36 | + | 
|  | 37 | +	// The linter is complaining about this but since we are using Java 8 the | 
|  | 38 | +	// possibilities are restricted. | 
|  | 39 | +	@SuppressWarnings("PMD.DoNotUseThreads") | 
|  | 40 | +	private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); | 
|  | 41 | + | 
|  | 42 | +	public AsyncActionHandler(CheckFunction<AsyncActionResult<T>> checkFn) { | 
|  | 43 | +		this.checkFn = checkFn; | 
|  | 44 | +		this.sleepBeforeWaitMillis = 0; | 
|  | 45 | +		this.throttleMillis = TimeUnit.SECONDS.toMillis(5); | 
|  | 46 | +		this.timeoutMillis = TimeUnit.MINUTES.toMillis(30); | 
|  | 47 | +		this.tempErrRetryLimit = 5; | 
|  | 48 | +	} | 
|  | 49 | + | 
|  | 50 | +	/** | 
|  | 51 | +	 * SetThrottle sets the time interval between each check of the async action. | 
|  | 52 | +	 * | 
|  | 53 | +	 * @param duration | 
|  | 54 | +	 * @param unit | 
|  | 55 | +	 */ | 
|  | 56 | +	public void setThrottle(long duration, TimeUnit unit) { | 
|  | 57 | +		this.throttleMillis = unit.toMillis(duration); | 
|  | 58 | +	} | 
|  | 59 | + | 
|  | 60 | +	/** | 
|  | 61 | +	 * SetTimeout sets the duration for wait timeout. | 
|  | 62 | +	 * | 
|  | 63 | +	 * @param duration | 
|  | 64 | +	 * @param unit | 
|  | 65 | +	 */ | 
|  | 66 | +	public void setTimeout(long duration, TimeUnit unit) { | 
|  | 67 | +		this.timeoutMillis = unit.toMillis(duration); | 
|  | 68 | +	} | 
|  | 69 | + | 
|  | 70 | +	/** | 
|  | 71 | +	 * SetSleepBeforeWait sets the duration for sleep before wait. | 
|  | 72 | +	 * | 
|  | 73 | +	 * @param duration | 
|  | 74 | +	 * @param unit | 
|  | 75 | +	 */ | 
|  | 76 | +	public void setSleepBeforeWait(long duration, TimeUnit unit) { | 
|  | 77 | +		this.sleepBeforeWaitMillis = unit.toMillis(duration); | 
|  | 78 | +	} | 
|  | 79 | + | 
|  | 80 | +	/** | 
|  | 81 | +	 * SetTempErrRetryLimit sets the retry limit if a temporary error is found. The list of | 
|  | 82 | +	 * temporary errors is defined in the RetryHttpErrorStatusCodes variable. | 
|  | 83 | +	 * | 
|  | 84 | +	 * @param limit | 
|  | 85 | +	 */ | 
|  | 86 | +	public void setTempErrRetryLimit(int limit) { | 
|  | 87 | +		this.tempErrRetryLimit = limit; | 
|  | 88 | +	} | 
|  | 89 | + | 
|  | 90 | +	/** | 
|  | 91 | +	 * Runnable task which is executed periodically. | 
|  | 92 | +	 *  | 
|  | 93 | +	 * @param future | 
|  | 94 | +	 * @param startTime | 
|  | 95 | +	 * @param retryTempErrorCounter | 
|  | 96 | +	 */ | 
|  | 97 | +	private void executeCheckTask(CompletableFuture<T> future, long startTime, AtomicInteger retryTempErrorCounter) { | 
|  | 98 | +		if (future.isDone()) { | 
|  | 99 | +        	return; | 
|  | 100 | +    	} | 
|  | 101 | +		if (System.currentTimeMillis() - startTime >= timeoutMillis) { | 
|  | 102 | +			future.completeExceptionally(new TimeoutException(TIMEOUT_ERROR_MESSAGE)); | 
|  | 103 | +		} | 
|  | 104 | +		try { | 
|  | 105 | +			AsyncActionResult<T> result = checkFn.execute(); | 
|  | 106 | +			if (result != null && result.isFinished()) { | 
|  | 107 | +				future.complete(result.getResponse()); | 
|  | 108 | +			}							 | 
|  | 109 | +		} catch (ApiException  e) { | 
|  | 110 | +				GenericOpenAPIException oapiErr = new GenericOpenAPIException(e); | 
|  | 111 | +				// Some APIs may return temporary errors and the request should be retried | 
|  | 112 | +				if (!RETRY_HTTP_ERROR_STATUS_CODES.contains(oapiErr.getStatusCode())) { | 
|  | 113 | +					return; | 
|  | 114 | +				} | 
|  | 115 | +				if (retryTempErrorCounter.incrementAndGet() == tempErrRetryLimit) { | 
|  | 116 | +					// complete the future with corresponding exception | 
|  | 117 | +					future.completeExceptionally(new Exception(TEMPORARY_ERROR_MESSAGE, oapiErr)); | 
|  | 118 | +				} | 
|  | 119 | +		} catch (IllegalStateException e) { | 
|  | 120 | +			future.completeExceptionally(e); | 
|  | 121 | +		} | 
|  | 122 | +	} | 
|  | 123 | + | 
|  | 124 | +	/** | 
|  | 125 | +	 * WaitWithContextAsync starts the wait until there's an error or wait is done | 
|  | 126 | +	 * | 
|  | 127 | +	 * @return | 
|  | 128 | +	 */ | 
|  | 129 | +	@SuppressWarnings("PMD.DoNotUseThreads") | 
|  | 130 | +	public CompletableFuture<T> waitWithContextAsync() { | 
|  | 131 | +		if (throttleMillis <= 0) { | 
|  | 132 | +			throw new IllegalArgumentException("Throttle can't be 0 or less"); | 
|  | 133 | +		} | 
|  | 134 | + | 
|  | 135 | +		CompletableFuture<T> future = new CompletableFuture<>(); | 
|  | 136 | +		long startTime = System.currentTimeMillis(); | 
|  | 137 | +		AtomicInteger retryTempErrorCounter = new AtomicInteger(0); | 
|  | 138 | + | 
|  | 139 | +		// This runnable is called periodically. | 
|  | 140 | +		Runnable checkTask = | 
|  | 141 | +				() -> executeCheckTask(future, startTime, retryTempErrorCounter); | 
|  | 142 | + | 
|  | 143 | +		// start the periodic execution | 
|  | 144 | +		ScheduledFuture<?> scheduledFuture = | 
|  | 145 | +				scheduler.scheduleAtFixedRate( | 
|  | 146 | +						checkTask, sleepBeforeWaitMillis, throttleMillis, TimeUnit.MILLISECONDS); | 
|  | 147 | + | 
|  | 148 | +		// stop task when future is completed | 
|  | 149 | +		future.whenComplete( | 
|  | 150 | +				(result, error) -> { | 
|  | 151 | +					scheduledFuture.cancel(true); | 
|  | 152 | +					scheduler.shutdown(); | 
|  | 153 | +				}); | 
|  | 154 | + | 
|  | 155 | +		return future; | 
|  | 156 | +	} | 
|  | 157 | + | 
|  | 158 | +	// Helper class to encapsulate the result of the checkFn | 
|  | 159 | +	public static class AsyncActionResult<T> { | 
|  | 160 | +		private final boolean finished; | 
|  | 161 | +		private final T response; | 
|  | 162 | + | 
|  | 163 | +		public AsyncActionResult(boolean finished, T response) { | 
|  | 164 | +			this.finished = finished; | 
|  | 165 | +			this.response = response; | 
|  | 166 | +		} | 
|  | 167 | + | 
|  | 168 | +		public boolean isFinished() { | 
|  | 169 | +			return finished; | 
|  | 170 | +		} | 
|  | 171 | + | 
|  | 172 | +		public T getResponse() { | 
|  | 173 | +			return response; | 
|  | 174 | +		}	 | 
|  | 175 | +	} | 
|  | 176 | + | 
|  | 177 | +	/** | 
|  | 178 | +	 * Helper function to check http status codes during deletion of a resource. | 
|  | 179 | +	 * @param e ApiException to check | 
|  | 180 | +	 * @return true if resource is gone otherwise false | 
|  | 181 | +	 */ | 
|  | 182 | +	public static boolean checkResourceGoneStatusCodes(ApiException apiException) { | 
|  | 183 | +		GenericOpenAPIException oapiErr = new GenericOpenAPIException(apiException); | 
|  | 184 | +		return oapiErr.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND | 
|  | 185 | +				|| oapiErr.getStatusCode() == HttpURLConnection.HTTP_FORBIDDEN; | 
|  | 186 | +	} | 
|  | 187 | +} | 
0 commit comments