From afbfbc5f9e57cf0691f6ba580b133cf45d9c0c47 Mon Sep 17 00:00:00 2001 From: Chetan Mehrotra Date: Wed, 21 Mar 2018 15:45:53 +0530 Subject: [PATCH 1/2] Support metadata for multipart upload --- .../s3mock/provider/FileProvider.scala | 10 ++- .../s3mock/provider/InMemoryProvider.scala | 10 ++- .../io/findify/s3mock/provider/Provider.scala | 2 +- .../findify/s3mock/route/MetadataUtil.scala | 75 +++++++++++++++++++ .../io/findify/s3mock/route/PutObject.scala | 64 +--------------- .../route/PutObjectMultipartStart.scala | 43 ++++++----- .../findify/s3mock/MultipartUploadTest.scala | 18 +++++ 7 files changed, 134 insertions(+), 88 deletions(-) create mode 100644 src/main/scala/io/findify/s3mock/route/MetadataUtil.scala diff --git a/src/main/scala/io/findify/s3mock/provider/FileProvider.scala b/src/main/scala/io/findify/s3mock/provider/FileProvider.scala index 108b7cf..99c67a2 100644 --- a/src/main/scala/io/findify/s3mock/provider/FileProvider.scala +++ b/src/main/scala/io/findify/s3mock/provider/FileProvider.scala @@ -90,11 +90,12 @@ class FileProvider(dir:String) extends Provider with LazyLogging { GetObjectData(file.byteArray, meta) } - override def putObjectMultipartStart(bucket:String, key:String):InitiateMultipartUploadResult = { + override def putObjectMultipartStart(bucket:String, key:String, metadata: ObjectMetadata):InitiateMultipartUploadResult = { val id = Math.abs(Random.nextLong()).toString val bucketFile = File(s"$dir/$bucket") if (!bucketFile.exists) throw NoSuchBucketException(bucket) File(s"$dir/.mp/$bucket/$key/$id/.keep").createIfNotExists(createParents = true) + metadataStore.put(bucket, key, metadata) logger.debug(s"starting multipart upload for s3://$bucket/$key") InitiateMultipartUploadResult(bucket, key, id) } @@ -116,8 +117,13 @@ class FileProvider(dir:String) extends Provider with LazyLogging { val data = parts.fold(Array[Byte]())(_ ++ _) file.writeBytes(data.toIterator) File(s"$dir/.mp/$bucket/$key").delete() + val hash = file.md5 + metadataStore.get(bucket, key).foreach {m => + m.setContentMD5(hash) + m.setLastModified(org.joda.time.DateTime.now().toDate) + } logger.debug(s"completed multipart upload for s3://$bucket/$key") - CompleteMultipartUploadResult(bucket, key, file.md5) + CompleteMultipartUploadResult(bucket, key, hash) } override def copyObject(sourceBucket: String, sourceKey: String, destBucket: String, destKey: String, newMeta: Option[ObjectMetadata] = None): CopyObjectResult = { diff --git a/src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala b/src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala index a3b1041..a355802 100644 --- a/src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala +++ b/src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala @@ -101,11 +101,12 @@ class InMemoryProvider extends Provider with LazyLogging { } } - override def putObjectMultipartStart(bucket: String, key: String): InitiateMultipartUploadResult = { + override def putObjectMultipartStart(bucket: String, key: String, metadata: ObjectMetadata): InitiateMultipartUploadResult = { bucketDataStore.get(bucket) match { case Some(_) => val id = Math.abs(Random.nextLong()).toString multipartTempStore.putIfAbsent(id, new mutable.TreeSet) + metadataStore.put(bucket, key, metadata) logger.debug(s"starting multipart upload for s3://$bucket/$key") InitiateMultipartUploadResult(bucket, key, id) case None => throw NoSuchBucketException(bucket) @@ -128,7 +129,12 @@ class InMemoryProvider extends Provider with LazyLogging { bucketContent.keysInBucket.put(key, KeyContents(DateTime.now, completeBytes)) multipartTempStore.remove(uploadId) logger.debug(s"completed multipart upload for s3://$bucket/$key") - CompleteMultipartUploadResult(bucket, key, DigestUtils.md5Hex(completeBytes)) + val hash = DigestUtils.md5Hex(completeBytes) + metadataStore.get(bucket, key).foreach {m => + m.setContentMD5(hash) + m.setLastModified(org.joda.time.DateTime.now().toDate) + } + CompleteMultipartUploadResult(bucket, key, hash) case None => throw NoSuchBucketException(bucket) } } diff --git a/src/main/scala/io/findify/s3mock/provider/Provider.scala b/src/main/scala/io/findify/s3mock/provider/Provider.scala index 850be58..4dc97c7 100644 --- a/src/main/scala/io/findify/s3mock/provider/Provider.scala +++ b/src/main/scala/io/findify/s3mock/provider/Provider.scala @@ -18,7 +18,7 @@ trait Provider { def createBucket(name:String, bucketConfig:CreateBucketConfiguration):CreateBucket def putObject(bucket:String, key:String, data:Array[Byte], metadata: ObjectMetadata):Unit def getObject(bucket:String, key:String): GetObjectData - def putObjectMultipartStart(bucket:String, key:String):InitiateMultipartUploadResult + def putObjectMultipartStart(bucket:String, key:String, metadata: ObjectMetadata):InitiateMultipartUploadResult def putObjectMultipartPart(bucket:String, key:String, partNumber:Int, uploadId:String, data:Array[Byte]):Unit def putObjectMultipartComplete(bucket:String, key:String, uploadId:String, request:CompleteMultipartUpload):CompleteMultipartUploadResult def deleteObject(bucket:String, key:String):Unit diff --git a/src/main/scala/io/findify/s3mock/route/MetadataUtil.scala b/src/main/scala/io/findify/s3mock/route/MetadataUtil.scala new file mode 100644 index 0000000..3c71a9e --- /dev/null +++ b/src/main/scala/io/findify/s3mock/route/MetadataUtil.scala @@ -0,0 +1,75 @@ +package io.findify.s3mock.route + +import java.lang.Iterable +import java.util + +import akka.http.javadsl.model.HttpHeader +import akka.http.scaladsl.model.HttpRequest +import com.amazonaws.AmazonClientException +import com.amazonaws.services.s3.Headers +import com.amazonaws.services.s3.internal.ServiceUtils +import com.amazonaws.services.s3.model.ObjectMetadata +import com.amazonaws.util.{DateUtils, StringUtils} +import com.typesafe.scalalogging.LazyLogging + +import scala.collection.JavaConverters._ + +object MetadataUtil extends LazyLogging { + + def populateObjectMetadata(request: HttpRequest): ObjectMetadata = { + val metadata = new ObjectMetadata() + val ignoredHeaders: util.HashSet[String] = new util.HashSet[String]() + ignoredHeaders.add(Headers.DATE) + ignoredHeaders.add(Headers.SERVER) + ignoredHeaders.add(Headers.REQUEST_ID) + ignoredHeaders.add(Headers.EXTENDED_REQUEST_ID) + ignoredHeaders.add(Headers.CLOUD_FRONT_ID) + ignoredHeaders.add(Headers.CONNECTION) + + val headers: Iterable[HttpHeader] = request.getHeaders() + for (header <- headers.asScala) { + var key: String = header.name() + if (StringUtils.beginsWithIgnoreCase(key, Headers.S3_USER_METADATA_PREFIX)) { + key = key.substring(Headers.S3_USER_METADATA_PREFIX.length) + metadata.addUserMetadata(key, header.value()) + } + // else if (ignoredHeaders.contains(key)) { + // ignore... + // } + else if (key.equalsIgnoreCase(Headers.LAST_MODIFIED)) try + metadata.setHeader(key, ServiceUtils.parseRfc822Date(header.value())) + + catch { + case pe: Exception => logger.warn("Unable to parse last modified date: " + header.value(), pe) + } + else if (key.equalsIgnoreCase(Headers.CONTENT_LENGTH)) try + metadata.setHeader(key, java.lang.Long.parseLong(header.value())) + + catch { + case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse content length. Header 'Content-Length' has corrupted data" + nfe.getMessage, nfe) + } + else if (key.equalsIgnoreCase(Headers.ETAG)) metadata.setHeader(key, ServiceUtils.removeQuotes(header.value())) + else if (key.equalsIgnoreCase(Headers.EXPIRES)) try + metadata.setHttpExpiresDate(DateUtils.parseRFC822Date(header.value())) + + catch { + case pe: Exception => logger.warn("Unable to parse http expiration date: " + header.value(), pe) + } + // else if (key.equalsIgnoreCase(Headers.EXPIRATION)) new ObjectExpirationHeaderHandler[ObjectMetadata]().handle(metadata, response) + // else if (key.equalsIgnoreCase(Headers.RESTORE)) new ObjectRestoreHeaderHandler[ObjectRestoreResult]().handle(metadata, response) + // else if (key.equalsIgnoreCase(Headers.REQUESTER_CHARGED_HEADER)) new S3RequesterChargedHeaderHandler[S3RequesterChargedResult]().handle(metadata, response) + else if (key.equalsIgnoreCase(Headers.S3_PARTS_COUNT)) try + metadata.setHeader(key, header.value().toInt) + + catch { + case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse part count. Header x-amz-mp-parts-count has corrupted data" + nfe.getMessage, nfe) + } + else metadata.setHeader(key, header.value()) + } + + if(metadata.getContentType == null){ + metadata.setContentType(request.entity.getContentType.toString) + } + metadata + } +} diff --git a/src/main/scala/io/findify/s3mock/route/PutObject.scala b/src/main/scala/io/findify/s3mock/route/PutObject.scala index 61f6464..0eb2534 100644 --- a/src/main/scala/io/findify/s3mock/route/PutObject.scala +++ b/src/main/scala/io/findify/s3mock/route/PutObject.scala @@ -1,26 +1,17 @@ package io.findify.s3mock.route -import java.lang.Iterable -import java.util - -import akka.http.javadsl.model.HttpHeader import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes} import akka.http.scaladsl.server.Directives._ import akka.stream.Materializer import akka.stream.scaladsl.Sink import akka.util.ByteString -import com.amazonaws.AmazonClientException -import com.amazonaws.services.s3.Headers -import com.amazonaws.services.s3.internal.ServiceUtils import com.amazonaws.services.s3.model.ObjectMetadata -import com.amazonaws.util.{DateUtils, StringUtils} import com.typesafe.scalalogging.LazyLogging import io.findify.s3mock.S3ChunkedProtocolStage import io.findify.s3mock.error.{InternalErrorException, NoSuchBucketException} import io.findify.s3mock.provider.Provider import org.apache.commons.codec.digest.DigestUtils -import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} /** @@ -95,60 +86,7 @@ case class PutObject(implicit provider:Provider, mat:Materializer) extends LazyL } private def populateObjectMetadata(request: HttpRequest, bytes: Array[Byte]): ObjectMetadata = { - val metadata = new ObjectMetadata() - val ignoredHeaders: util.HashSet[String] = new util.HashSet[String]() - ignoredHeaders.add(Headers.DATE) - ignoredHeaders.add(Headers.SERVER) - ignoredHeaders.add(Headers.REQUEST_ID) - ignoredHeaders.add(Headers.EXTENDED_REQUEST_ID) - ignoredHeaders.add(Headers.CLOUD_FRONT_ID) - ignoredHeaders.add(Headers.CONNECTION) - - val headers: Iterable[HttpHeader] = request.getHeaders() - for (header <- headers.asScala) { - var key: String = header.name() - if (StringUtils.beginsWithIgnoreCase(key, Headers.S3_USER_METADATA_PREFIX)) { - key = key.substring(Headers.S3_USER_METADATA_PREFIX.length) - metadata.addUserMetadata(key, header.value()) - } - // else if (ignoredHeaders.contains(key)) { - // ignore... - // } - else if (key.equalsIgnoreCase(Headers.LAST_MODIFIED)) try - metadata.setHeader(key, ServiceUtils.parseRfc822Date(header.value())) - - catch { - case pe: Exception => logger.warn("Unable to parse last modified date: " + header.value(), pe) - } - else if (key.equalsIgnoreCase(Headers.CONTENT_LENGTH)) try - metadata.setHeader(key, java.lang.Long.parseLong(header.value())) - - catch { - case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse content length. Header 'Content-Length' has corrupted data" + nfe.getMessage, nfe) - } - else if (key.equalsIgnoreCase(Headers.ETAG)) metadata.setHeader(key, ServiceUtils.removeQuotes(header.value())) - else if (key.equalsIgnoreCase(Headers.EXPIRES)) try - metadata.setHttpExpiresDate(DateUtils.parseRFC822Date(header.value())) - - catch { - case pe: Exception => logger.warn("Unable to parse http expiration date: " + header.value(), pe) - } - // else if (key.equalsIgnoreCase(Headers.EXPIRATION)) new ObjectExpirationHeaderHandler[ObjectMetadata]().handle(metadata, response) - // else if (key.equalsIgnoreCase(Headers.RESTORE)) new ObjectRestoreHeaderHandler[ObjectRestoreResult]().handle(metadata, response) - // else if (key.equalsIgnoreCase(Headers.REQUESTER_CHARGED_HEADER)) new S3RequesterChargedHeaderHandler[S3RequesterChargedResult]().handle(metadata, response) - else if (key.equalsIgnoreCase(Headers.S3_PARTS_COUNT)) try - metadata.setHeader(key, header.value().toInt) - - catch { - case nfe: NumberFormatException => throw new AmazonClientException("Unable to parse part count. Header x-amz-mp-parts-count has corrupted data" + nfe.getMessage, nfe) - } - else metadata.setHeader(key, header.value()) - } - - if(metadata.getContentType == null){ - metadata.setContentType(request.entity.getContentType.toString) - } - metadata.getRawMetadata + val metadata = MetadataUtil.populateObjectMetadata(request) metadata.setContentMD5(DigestUtils.md5Hex(bytes)) metadata } diff --git a/src/main/scala/io/findify/s3mock/route/PutObjectMultipartStart.scala b/src/main/scala/io/findify/s3mock/route/PutObjectMultipartStart.scala index 943a892..cc09b0d 100644 --- a/src/main/scala/io/findify/s3mock/route/PutObjectMultipartStart.scala +++ b/src/main/scala/io/findify/s3mock/route/PutObjectMultipartStart.scala @@ -15,27 +15,30 @@ import scala.util.{Failure, Success, Try} */ case class PutObjectMultipartStart(implicit provider:Provider) extends LazyLogging { def route(bucket:String, path:String) = post { - parameter('uploads) { mp => - complete { - logger.info(s"multipart upload start to $bucket/$path") - Try(provider.putObjectMultipartStart(bucket, path)) match { - case Success(result) => - HttpResponse( - StatusCodes.OK, - entity = HttpEntity( - ContentTypes.`application/octet-stream`, result.toXML.toString().getBytes(StandardCharsets.UTF_8) + extractRequest { request => + parameter('uploads) { mp => + complete { + val metadata = MetadataUtil.populateObjectMetadata(request) + logger.info(s"multipart upload start to $bucket/$path") + Try(provider.putObjectMultipartStart(bucket, path, metadata)) match { + case Success(result) => + HttpResponse( + StatusCodes.OK, + entity = HttpEntity( + ContentTypes.`application/octet-stream`, result.toXML.toString().getBytes(StandardCharsets.UTF_8) + ) ) - ) - case Failure(e: NoSuchBucketException) => - HttpResponse( - StatusCodes.NotFound, - entity = e.toXML.toString() - ) - case Failure(t) => - HttpResponse( - StatusCodes.InternalServerError, - entity = InternalErrorException(t).toXML.toString() - ) + case Failure(e: NoSuchBucketException) => + HttpResponse( + StatusCodes.NotFound, + entity = e.toXML.toString() + ) + case Failure(t) => + HttpResponse( + StatusCodes.InternalServerError, + entity = InternalErrorException(t).toXML.toString() + ) + } } } } diff --git a/src/test/scala/io/findify/s3mock/MultipartUploadTest.scala b/src/test/scala/io/findify/s3mock/MultipartUploadTest.scala index 1a0c922..7c0f132 100644 --- a/src/test/scala/io/findify/s3mock/MultipartUploadTest.scala +++ b/src/test/scala/io/findify/s3mock/MultipartUploadTest.scala @@ -84,5 +84,23 @@ class MultipartUploadTest extends S3MockTest { exc.getStatusCode shouldBe 404 exc.getErrorCode shouldBe "NoSuchBucket" } + + it should "upload multipart with metadata" in { + s3.createBucket("getput") + val metadata: ObjectMetadata = new ObjectMetadata() + metadata.setContentType("application/json") + metadata.addUserMetadata("metamaic", "maic") + val init = s3.initiateMultipartUpload(new InitiateMultipartUploadRequest("getput", "foo4", metadata)) + val p1 = s3.uploadPart(new UploadPartRequest().withBucketName("getput").withPartSize(10).withKey("foo4").withPartNumber(1).withUploadId(init.getUploadId).withInputStream(new ByteArrayInputStream("hellohello".getBytes()))) + val p2 = s3.uploadPart(new UploadPartRequest().withBucketName("getput").withPartSize(10).withKey("foo4").withPartNumber(2).withUploadId(init.getUploadId).withInputStream(new ByteArrayInputStream("worldworld".getBytes()))) + val result = s3.completeMultipartUpload(new CompleteMultipartUploadRequest("getput", "foo4", init.getUploadId, List(p1.getPartETag, p2.getPartETag).asJava)) + result.getKey shouldBe "foo4" + val s3Object = s3.getObject("getput", "foo4") + getContent(s3Object) shouldBe "hellohelloworldworld" + + val actualMetadata: ObjectMetadata = s3Object.getObjectMetadata + actualMetadata.getContentType shouldBe "application/json" + actualMetadata.getUserMetadata.get("metamaic") shouldBe "maic" + } } } From c2e69456e9c4da2d7d064f2bdb5637160c2f9695 Mon Sep 17 00:00:00 2001 From: Eric Peters Date: Sun, 24 Mar 2019 21:13:12 -0700 Subject: [PATCH 2/2] Upgrade dependencies --- build.sbt | 22 +++++++++++----------- project/build.properties | 2 +- project/plugins.sbt | 13 ++++++------- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/build.sbt b/build.sbt index 092ca9b..8ebc5d9 100644 --- a/build.sbt +++ b/build.sbt @@ -4,11 +4,11 @@ version := "0.2.5" organization := "io.findify" -scalaVersion := "2.12.4" +scalaVersion := "2.12.8" crossScalaVersions := Seq("2.11.11", "2.12.4") -val akkaVersion = "2.5.11" +val akkaVersion = "2.5.21" licenses := Seq("MIT" -> url("https://opensource.org/licenses/MIT")) @@ -16,16 +16,16 @@ homepage := Some(url("https://github.com/findify/s3mock")) libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % akkaVersion, - "com.typesafe.akka" %% "akka-http" % "10.1.0", + "com.typesafe.akka" %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % "test", - "org.scala-lang.modules" %% "scala-xml" % "1.1.0", - "com.github.pathikrit" %% "better-files" % "3.4.0", - "com.typesafe.scala-logging" %% "scala-logging" % "3.8.0", - "com.amazonaws" % "aws-java-sdk-s3" % "1.11.294", - "org.scalatest" %% "scalatest" % "3.0.5" % "test", + "org.scala-lang.modules" %% "scala-xml" % "1.1.1", + "com.github.pathikrit" %% "better-files" % "3.7.1", + "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2", + "com.amazonaws" % "aws-java-sdk-s3" % "1.11.524", + "org.scalatest" %% "scalatest" % "3.0.7" % "test", "ch.qos.logback" % "logback-classic" % "1.2.3" % "test", - "org.iq80.leveldb" % "leveldb" % "0.10", - "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.17" % "test" + "org.iq80.leveldb" % "leveldb" % "0.11", + "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.20" % "test" ) parallelExecution in Test := false @@ -59,7 +59,7 @@ mainClass in assembly := Some("io.findify.s3mock.Main") test in assembly := {} dockerfile in docker := new Dockerfile { - from("openjdk:9.0.1-11-jre-slim") + from("openjdk:11-slim") expose(8001) add(assembly.value, "/app/s3mock.jar") entryPoint("java", "-Xmx128m", "-jar", "--add-modules", "java.xml.bind", "/app/s3mock.jar") diff --git a/project/build.properties b/project/build.properties index 6f5ae37..7609b47 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.1.0 \ No newline at end of file +sbt.version = 1.2.8 \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index 4224453..b38dbd5 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,10 +1,9 @@ logLevel := Level.Warn -addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.3.3") -addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.0") -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1") -addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.7") -addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.5.0") -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6") +addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.4.0") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.5") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.2-1") +addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.10") addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.5.0") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")