diff --git a/README.md b/README.md index 1e4b318..1738380 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,65 @@ * **min_task_size** (experimental): minimum bytesize of a task. If this is larger than 0, one task includes multiple input files up until it becomes the bytesize in total. This is useful if too many number of tasks impacts performance of output or executor plugins badly. (integer, optional) +- **client_config**: configure S3 client config (optional) + + - **protocol**: (enum, `HTTP` or `HTTPS`. default: `"HTTPS"`) + + - **max_connections**: (int, default: `50`) + + - **user_agent** (string, optional) + + - **local_address**: name of a hostname (string, optional) + + - **proxy_host**: name of a hostname (string, optional) + + - **proxy_port**: (int, optional) + + - **proxy_username**: (string, optional) + + - **proxy_password**: (string, optional) + + - **proxy_domain**: (string, optional) + + - **proxy_workstation**: (string, optional) + + - **max_error_retry**: (int, default: `3`) + + - **socket_timeout**: (duration, default: `8min`) + + - **connection_timeout**: (duration, default: `50sec`) + + - **request_timeout**: (duration, default: no timeout) + + - **use_reaper**: (boolean, optional) + + - **use_gzip**: (boolean, optional) + + - **signer_override**: (string, optional) + + - **preemptive_basic_proxy_auth**: (boolean, optional) + + - **connection_ttl**: (duration, optional) + + - **connection_max_idle**: (duration, default: `60sec`) + + - **use_tcp_keep_alive**: (boolean, optional) + + - **response_metadata_cache_size**: (bytesize, optional) + + - **use_expect_continue**: (boolean, optional) + + - **secure_random**: (optional) + + - **algorithm**: (string, required) + + - **provider**: (string, optional) + + - **socket_send_buffer_size_hint**: (bytesize, optional) + + - **socket_receive_buffer_size_hint**: (bytesize, optional) + + ## Example ```yaml diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java index 3f50eae..9df4387 100644 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java @@ -1,19 +1,15 @@ package org.embulk.input.s3; import java.util.List; -import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.io.IOException; import java.io.InterruptedIOException; import java.io.InputStream; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.base.Optional; import com.google.common.base.Throwables; import org.slf4j.Logger; -import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.ListObjectsRequest; @@ -23,7 +19,6 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.ClientConfiguration; import com.amazonaws.AmazonServiceException; -import com.amazonaws.Protocol; import org.embulk.config.Config; import org.embulk.config.ConfigInject; import org.embulk.config.ConfigDefault; @@ -65,7 +60,9 @@ public interface PluginTask @ConfigDefault("null") public Optional getAccessKeyId(); - // TODO timeout, ssl, etc + @Config("client_config") + @ConfigDefault("{}") + public AwsClientConfigurationTask getClientConfig(); public FileList getFiles(); public void setFiles(FileList files); @@ -129,14 +126,7 @@ protected AWSCredentialsProvider getCredentialsProvider(PluginTask task) protected ClientConfiguration getClientConfiguration(PluginTask task) { - ClientConfiguration clientConfig = new ClientConfiguration(); - - //clientConfig.setProtocol(Protocol.HTTP); - clientConfig.setMaxConnections(50); // SDK default: 50 - clientConfig.setMaxErrorRetry(3); // SDK default: 3 - clientConfig.setSocketTimeout(8*60*1000); // SDK default: 50*1000 - - return clientConfig; + return AwsClientConfigurations.getClientConfiguration(task.getClientConfig()); } private FileList listFiles(PluginTask task) diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationTask.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationTask.java new file mode 100644 index 0000000..db36af8 --- /dev/null +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationTask.java @@ -0,0 +1,147 @@ +package org.embulk.input.s3; + +import com.google.common.base.Optional; +import com.amazonaws.Protocol; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.config.Task; +import org.embulk.spi.unit.ByteSize; + +public interface AwsClientConfigurationTask + extends Task +{ + // NOTE: Can use `client_execution_timeout` from v1.10.65 + // @Config("client_execution_timeout") + // @ConfigDefault("null") + // Optional getClientExecutionTimeout(); + + @Config("connection_max_idle") + @ConfigDefault("null") + Optional getConnectionMaxIdle(); + + @Config("connection_timeout") + @ConfigDefault("null") // SDK default: 50sec + Optional getConnectionTimeout(); + + @Config("connection_ttl") + @ConfigDefault("null") + Optional getConnectionTTL(); + + // NOTE: DnsResolver is a interface + // @Config("dns_resolver") + // @ConfigDefault("null") + // Optional getDnsResolver(); + + @Config("local_address") + @ConfigDefault("null") + Optional getLocalAddress(); + + @Config("max_connections") + @ConfigDefault("50") + Optional getMaxConnections(); + + @Config("max_error_retry") + @ConfigDefault("3") + Optional getMaxErrorRetry(); + + @Config("preemptive_basic_proxy_auth") + @ConfigDefault("null") + Optional getPreemptiveBasicProxyAuth(); + + @Config("protocol") + @ConfigDefault("null") // SDK default: HTTPS + Optional getProtocol(); + + @Config("proxy_domain") + @ConfigDefault("null") + Optional getProxyDomain(); + + @Config("proxy_host") + @ConfigDefault("null") + Optional getProxyHost(); + + @Config("proxy_password") + @ConfigDefault("null") + Optional getProxyPassword(); + + @Config("proxy_port") + @ConfigDefault("null") + Optional getProxyPort(); + + @Config("proxy_username") + @ConfigDefault("null") + Optional getProxyUsername(); + + @Config("proxy_workstation") + @ConfigDefault("null") + Optional getProxyWorkstation(); + + @Config("request_timeout") + @ConfigDefault("null") + Optional getRequestTimeout(); + + @Config("response_metadata_cache_size") + @ConfigDefault("null") + Optional getResponseMetadataCacheSize(); + + // NOTE: RetryPolicy is a interface + // @Config("retry_policy") + // @ConfigDefault("null") + // Optional getRetryPolicy(); + + @Config("secure_random") + @ConfigDefault("null") + Optional getSecureRandom(); + + public interface SecureRandomTask + extends org.embulk.config.Task + { + @Config("algorithm") + String getAlgorithm(); + + @Config("provider") + @ConfigDefault("null") + Optional getProvider(); + } + + @Config("signer_override") + @ConfigDefault("null") + Optional getSignerOverride(); + + @Config("socket_timeout") + @ConfigDefault("\"8min\"") + Optional getSocketTimeout(); + + @Config("use_expect_continue") + @ConfigDefault("null") + Optional getUseExpectContinue(); + + @Config("use_gzip") + @ConfigDefault("null") + Optional getUseGzip(); + + @Config("user_agent") + @ConfigDefault("null") + Optional getUserAgent(); + + @Config("use_reaper") + @ConfigDefault("null") + Optional getUseReaper(); + + @Config("use_tcp_keep_alive") + @ConfigDefault("null") + Optional getUseTcpKeepAlive(); + + // NOTE: Can use `use_throttle_retries` from v1.10.65 + // @Config("use_throttle_retries") + // @ConfigDefault("null") + // Optional getUseThrottleRetries(); + + @Config("socket_send_buffer_size_hint") // used by setSocketBufferSizeHints + @ConfigDefault("null") + Optional getSocketSendBufferSizeHint(); + + @Config("socket_receive_buffer_size_hint") // used by setSocketBufferSizeHints + @ConfigDefault("null") + Optional getSocketReceiveBufferSizeHint(); +} diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java new file mode 100644 index 0000000..9a81238 --- /dev/null +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java @@ -0,0 +1,156 @@ +package org.embulk.input.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import org.embulk.config.ConfigException; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.SecureRandom; + +public class AwsClientConfigurations +{ + public static ClientConfiguration getClientConfiguration(AwsClientConfigurationTask task) + { + ClientConfiguration c = new ClientConfiguration(); + + //if (task.getClientExecutionTimeout().isPresent()) { + // c.setClientExecutionTimeout(task.getClientExecutionTimeout().get()); + //} + + if (task.getConnectionMaxIdle().isPresent()) { + c.setConnectionMaxIdleMillis(task.getConnectionMaxIdle().get().getMillis()); + } + + if (task.getConnectionTimeout().isPresent()) { + c.setConnectionTimeout((int) task.getConnectionTimeout().get().getMillis()); + } + + if (task.getConnectionTTL().isPresent()) { + c.setConnectionTTL(task.getConnectionTTL().get().getMillis()); + } + + //if (task.getDnsResolver().isPresent()) { + // c.setDnsResolver(task.getDnsResolver().get()); + //} + + if (task.getLocalAddress().isPresent()) { + try { + InetAddress addr = InetAddress.getByName(task.getLocalAddress().get()); + c.setLocalAddress(addr); + } + catch (UnknownHostException e) { + throw new ConfigException("Invalid local_address", e); + } + } + + if (task.getMaxConnections().isPresent()) { + c.setMaxConnections(task.getMaxConnections().get()); + } + + if (task.getMaxErrorRetry().isPresent()) { + c.setMaxErrorRetry(task.getMaxErrorRetry().get()); + } + + if (task.getPreemptiveBasicProxyAuth().isPresent()) { + c.setPreemptiveBasicProxyAuth(task.getPreemptiveBasicProxyAuth().get()); + } + + if (task.getProtocol().isPresent()) { + c.setProtocol(task.getProtocol().get()); + } + + if (task.getProxyDomain().isPresent()) { + c.setProxyDomain(task.getProxyDomain().get()); + } + + if (task.getProxyHost().isPresent()) { + c.setProxyHost(task.getProxyHost().get()); + } + + if (task.getProxyPassword().isPresent()) { + c.setProxyPassword(task.getProxyPassword().get()); + } + + if (task.getProxyPort().isPresent()) { + c.setProxyPort(task.getProxyPort().get()); + } + + if (task.getProxyUsername().isPresent()) { + c.setProxyUsername(task.getProxyUsername().get()); + } + + if (task.getProxyWorkstation().isPresent()) { + c.setProxyWorkstation(task.getProxyWorkstation().get()); + } + + if (task.getRequestTimeout().isPresent()) { + c.setRequestTimeout((int) task.getRequestTimeout().get().getMillis()); + } + + if (task.getResponseMetadataCacheSize().isPresent()) { + c.setResponseMetadataCacheSize(task.getResponseMetadataCacheSize().get().getBytesInt()); + } + + //if (task.getRetryPolicy().isPresent()) { + // c.setRetryPolicy(task.getRetryPolicy().get()); + //} + + if (task.getSecureRandom().isPresent()) { + try { + AwsClientConfigurationTask.SecureRandomTask secureRandomTask = task.getSecureRandom().get(); + SecureRandom rand = + secureRandomTask.getProvider().isPresent() + ? SecureRandom.getInstance(secureRandomTask.getAlgorithm(), secureRandomTask.getProvider().get()) + : SecureRandom.getInstance(secureRandomTask.getAlgorithm()); + c.setSecureRandom(rand); + } + catch (NoSuchAlgorithmException | NoSuchProviderException e) { + throw new ConfigException("Invalid secure_random", e); + } + } + + if (task.getSignerOverride().isPresent()) { + c.setSignerOverride(task.getSignerOverride().get()); + } + + if (task.getSocketTimeout().isPresent()) { + c.setSocketTimeout(task.getSocketTimeout().get().getMillisInt()); + } + + if (task.getUseExpectContinue().isPresent()) { + c.setUseExpectContinue(task.getUseExpectContinue().get()); + } + + if (task.getUseGzip().isPresent()) { + c.setUseGzip(task.getUseGzip().get()); + } + + if (task.getUserAgent().isPresent()) { + c.setUserAgent(task.getUserAgent().get()); + } + + if (task.getUseReaper().isPresent()) { + c.setUseReaper(task.getUseReaper().get()); + } + + if (task.getUseTcpKeepAlive().isPresent()) { + c.setUseTcpKeepAlive(task.getUseTcpKeepAlive().get()); + } + + //if (task.getUseThrottleRetries().isPresent()) { + // c.setUseThrottleRetries(task.getUseThrottleRetries().get()); + //} + + if (task.getSocketSendBufferSizeHint().isPresent() && task.getSocketReceiveBufferSizeHint().isPresent()) { + c.setSocketBufferSizeHints(task.getSocketSendBufferSizeHint().get().getBytesInt(), task.getSocketReceiveBufferSizeHint().get().getBytesInt()); + } + else if (task.getSocketSendBufferSizeHint().isPresent() || task.getSocketReceiveBufferSizeHint().isPresent()) { + throw new ConfigException("socket_send_buffer_size_hint and socket_receive_buffer_size_hint must set together"); + } + + return c; + } +} diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/Duration.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/Duration.java new file mode 100644 index 0000000..98a80c8 --- /dev/null +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/Duration.java @@ -0,0 +1,159 @@ +package org.embulk.input.s3; + +import java.util.Objects; +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; + +// This class should be moved to org.embulk.spi.unit +public class Duration + implements Comparable +{ + private static final Pattern PATTERN = Pattern.compile("\\A(\\d+(?:\\.\\d+)?)\\s?([a-zA-Z]*)\\z"); + + private final long usec; + private final Unit displayUnit; + + public Duration(double duration, Unit unit) + { + Preconditions.checkArgument(!Double.isInfinite(duration), "duration is infinite"); + Preconditions.checkArgument(!Double.isNaN(duration), "duration is not a number"); + Preconditions.checkArgument(duration >= 0, "duration is negative"); + Preconditions.checkNotNull(unit, "unit is null"); + Preconditions.checkArgument(duration * unit.getFactorToUsec() <= Long.MAX_VALUE, "duration is large than (2^63)-1 in milliseconds"); + this.usec = (long) (duration * unit.getFactorToUsec()); + this.displayUnit = unit; + } + + @JsonCreator + @Deprecated + public Duration(long usec) + { + Preconditions.checkArgument(usec >= 0, "duration is negative"); + this.usec = usec; + this.displayUnit = Unit.MSEC; + } + + public long getMillis() + { + return usec / 1000; + } + + public int getMillisInt() + { + if (usec / 1000 > Integer.MAX_VALUE) { + throw new RuntimeException("Duration is too large (must be smaller than (2^31)-1 milliseconds, abount 24 days)"); + } + return (int) (usec / 1000); + } + + public long roundTo(Unit unit) + { + return (long) Math.floor(getValue(unit) + 0.5); + } + + public double getValue(Unit unit) + { + return usec / (double) unit.getFactorToUsec(); + } + + @JsonCreator + public static Duration parseDuration(String duration) + { + Preconditions.checkNotNull(duration, "duration is null"); + Preconditions.checkArgument(!duration.isEmpty(), "duration is empty"); + + Matcher matcher = PATTERN.matcher(duration); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid time duration string '" + duration + "'"); + } + + double value = Double.parseDouble(matcher.group(1)); // NumberFormatException extends IllegalArgumentException. + + String unitString = matcher.group(2); + if (unitString.isEmpty()) { + return new Duration(value, Unit.SECONDS); + } else { + String upperUnitString = unitString.toUpperCase(Locale.ENGLISH); + for (Unit unit : Unit.values()) { + if (unit.getUnitString().toUpperCase(Locale.ENGLISH).equals(upperUnitString)) { + return new Duration(value, unit); + } + } + } + + throw new IllegalArgumentException("Unknown unit '" + unitString + "'"); + } + + @JsonValue + @Override + public String toString() + { + double value = getValue(displayUnit); + String integer = String.format(Locale.ENGLISH, "%d", (long) value); + String decimal = String.format(Locale.ENGLISH, "%.2f", value); + if (decimal.equals(integer + ".00")) { + return integer + displayUnit.getUnitString(); + } else { + return decimal + displayUnit.getUnitString(); + } + } + + @Override + public int compareTo(Duration o) + { + return Long.compare(usec, o.usec); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (!(obj instanceof Duration)) { + return false; + } + Duration o = (Duration) obj; + return this.usec == o.usec; + } + + @Override + public int hashCode() + { + return Objects.hashCode(usec); + } + + public enum Unit + { + USEC(1L, "usec"), + MSEC(1000L, "msec"), + SECONDS(1000L*1000L, "sec"), + MINUTES(60*1000L*1000L, "min"), + HOURS(60*60*1000L*1000L, "hour"), + DAYS(24*60*60*1000L*1000L, "day"); + + private final long factorToUsec; + private final String unitString; + + Unit(long factorToUsec, String unitString) + { + this.factorToUsec = factorToUsec; + this.unitString = unitString; + } + + long getFactorToUsec() + { + return factorToUsec; + } + + String getUnitString() + { + return unitString; + } + } +} diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java index 2460853..45d327a 100644 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java @@ -19,6 +19,7 @@ import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigSource; +import org.embulk.spi.unit.ByteSize; import com.google.common.base.Throwables; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -42,7 +43,7 @@ public interface Task // TODO support more algorithms to combine tasks @Config("min_task_size") @ConfigDefault("0") - long getMinTaskSize(); + ByteSize getMinTaskSize(); } public static class Entry @@ -74,7 +75,7 @@ public static class Builder private String last = null; private int limitCount = Integer.MAX_VALUE; - private long minTaskSize = 1; + private long minTaskSize = 0; private Pattern pathMatchPattern; private final ByteBuffer castBuffer = ByteBuffer.allocate(4); @@ -84,7 +85,7 @@ public Builder(Task task) this(); this.pathMatchPattern = Pattern.compile(task.getPathMatchPattern()); this.limitCount = task.getTotalFileCountLimit(); - this.minTaskSize = task.getMinTaskSize(); + this.minTaskSize = task.getMinTaskSize().getBytes(); } public Builder(ConfigSource config) @@ -92,7 +93,7 @@ public Builder(ConfigSource config) this(); this.pathMatchPattern = Pattern.compile(config.get(String.class, "path_match_pattern", ".*")); this.limitCount = config.get(int.class, "total_file_count_limit", Integer.MAX_VALUE); - this.minTaskSize = config.get(long.class, "min_task_size", 0L); + this.minTaskSize = config.get(ByteSize.class, "min_task_size", new ByteSize(0)).getBytes(); } public Builder() diff --git a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestAwsClientConfiguration.java b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestAwsClientConfiguration.java new file mode 100644 index 0000000..9cfe541 --- /dev/null +++ b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestAwsClientConfiguration.java @@ -0,0 +1,108 @@ +package org.embulk.input.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import org.embulk.EmbulkTestRuntime; +import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; +import org.embulk.spi.Exec; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; + +import static org.embulk.input.s3.TestS3FileInputPlugin.parserConfig; +import static org.embulk.input.s3.TestS3FileInputPlugin.schemaConfig; +import static org.junit.Assert.assertEquals; + +public class TestAwsClientConfiguration +{ + @Rule + public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); + + private S3FileInputPlugin plugin; + private ConfigSource config; + + @Before + public void createResources() + { + plugin = new S3FileInputPlugin(); + config = runtime.getExec().newConfigSource() + .set("type", "s3") + .set("bucket", "dummy") + .set("path_prefix", "dummy") + .set("parser", parserConfig(schemaConfig())); + } + + @Test + public void setOneParam() + { + config.setNested("client_config", Exec.newConfigSource() + .set("protocol", "HTTP") + .set("user_agent", "test_embulk_input_s3")); + + ClientConfiguration clientConfiguration = getClientConfiguration(); + + assertEquals(Protocol.HTTP, clientConfiguration.getProtocol()); + assertEquals("test_embulk_input_s3", clientConfiguration.getUserAgent()); + } + + @Test + public void setTwoParam() + { + config.setNested("client_config", Exec.newConfigSource() + .set("socket_send_buffer_size_hint", "1MB") + .set("socket_receive_buffer_size_hint", "2MB")); + + ClientConfiguration clientConfiguration = getClientConfiguration(); + + int[] socketBufferSizeHints = clientConfiguration.getSocketBufferSizeHints(); + assertEquals(1 << 20, socketBufferSizeHints[0]); + assertEquals(2 << 20, socketBufferSizeHints[1]); + } + + @Test + public void defaultValue() + { + ClientConfiguration clientConfiguration = getClientConfiguration(); + + assertEquals(3, clientConfiguration.getMaxErrorRetry()); + assertEquals(50, clientConfiguration.getMaxConnections()); + assertEquals(8*60*1000, clientConfiguration.getSocketTimeout()); + } + + @Test + public void secureRandom() + throws NoSuchAlgorithmException + { + config.setNested("client_config", Exec.newConfigSource() + .setNested("secure_random", Exec.newConfigSource() + .set("algorithm", "SHA1PRNG") + ) + ); + + ClientConfiguration clientConfiguration = getClientConfiguration(); + + assertEquals(SecureRandom.getInstance("SHA1PRNG").getAlgorithm(), clientConfiguration.getSecureRandom().getAlgorithm()); + } + + @Test(expected = ConfigException.class) + public void secureRandomNoSuchAlgorithmException() + { + config.setNested("client_config", Exec.newConfigSource() + .setNested("secure_random", Exec.newConfigSource() + .set("algorithm", "FOOOOOOO") + ) + ); + + ClientConfiguration clientConfiguration = getClientConfiguration(); + } + + private ClientConfiguration getClientConfiguration() + { + S3FileInputPlugin.S3PluginTask task = config.loadConfig(S3FileInputPlugin.S3PluginTask.class); + return plugin.getClientConfiguration(task); + } +} diff --git a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestDuration.java b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestDuration.java new file mode 100644 index 0000000..8d731f0 --- /dev/null +++ b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestDuration.java @@ -0,0 +1,55 @@ +package org.embulk.input.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import org.embulk.EmbulkTestRuntime; +import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; +import org.embulk.spi.Exec; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; + +import static org.embulk.input.s3.TestS3FileInputPlugin.parserConfig; +import static org.embulk.input.s3.TestS3FileInputPlugin.schemaConfig; +import static org.junit.Assert.assertEquals; + +public class TestDuration +{ + @Test + public void testUnits() + { + assertDuration(1, "1usec"); + assertDuration(1000L, "1msec"); + assertDuration(1000*1000L, "1sec"); + assertDuration(60*1000*1000L, "1min"); + assertDuration(60*60*1000*1000L, "1hour"); + assertDuration(24*60*60*1000*1000L, "1day"); + + assertDuration(2, "2usec"); + assertDuration(2000L, "2msec"); + assertDuration(2000*1000L, "2sec"); + assertDuration(2*60*1000*1000L, "2min"); + assertDuration(2*60*60*1000*1000L, "2hour"); + assertDuration(2*24*60*60*1000*1000L, "2day"); + + assertDuration(0, "0.0usec"); + assertDuration(2400L, "2.4msec"); + assertDuration(2400*1000L, "2.4sec"); + assertDuration(144*1000*1000L, "2.4min"); + assertDuration(144*60*1000*1000L, "2.4hour"); + assertDuration(60*60*60*1000*1000L, "2.5day"); + } + + public void assertDuration(long usec, String str) + { + assertEquals(usec, Duration.parseDuration(str).roundTo(Duration.Unit.USEC)); + assertEquals(Duration.parseDuration(str), Duration.parseDuration(Duration.parseDuration(str).toString())); + if (usec % 1000 == 0) { + assertEquals(usec / 1000L, Duration.parseDuration(str).getMillis()); + } + } +}