Skip to content

Commit d2b0ef2

Browse files
committed
Improved implementation for FutureTransfer.listenFor
Cast to internal type `AbstractTransfer` so that a low-level state change listener can be attached. In light of issue #30, this provides a more foolproof implementation of listening to transfer events. There is now a three-way race to complete the promise that signals the completion/termination of the transfer. This adds an integration test for s3, using the fakes3 ruby gem. closes #30
1 parent 087cbdf commit d2b0ef2

13 files changed

+260
-18
lines changed

.gitignore

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
*.class
22
*.log
33

4-
/dynamodb
4+
/dynamodb-local
5+
/fakes3
6+
7+
/Gemfile.lock
58

69
# sbt specific
710
dist/*

.travis.yml

+1
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ jdk:
99
cache:
1010
directories:
1111
- $HOME/.ivy2/cache
12+
install: bundle install
1213
script:
1314
- sbt ++$TRAVIS_SCALA_VERSION --warn update compile doc awsWrapTest/it:compile && sbt ++$TRAVIS_SCALA_VERSION awsWrapTest/it:test

Gemfile

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
source 'https://rubygems.org'
2+
gem 'fakes3'

integration/build.sbt

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ name := "aws-wrap-test"
44

55
libraryDependencies ++= Seq(
66
Dependencies.Compile.awsJavaSDK_dynamodb % "it",
7+
Dependencies.Compile.awsJavaSDK_s3 % "it",
78
Dependencies.Compile.jodaTime % "it",
89
Dependencies.Compile.jodaConvert % "it",
910
Dependencies.Compile.logback % "it",
@@ -20,10 +21,14 @@ testOptions in IntegrationTest += Tests.Setup { () =>
2021
println("Start DynamoDB Local")
2122
System.setProperty("DynamoDB.localMode", "true")
2223
Process("bash start-dynamodb-local.sh").!
24+
println("Start fakes3")
25+
Process("bash start-fakes3.sh").!
2326
}
2427

2528
testOptions in IntegrationTest += Tests.Cleanup { () =>
2629
println("Stop DynamoDB Local")
2730
System.clearProperty("DynamoDB.localMode")
2831
Process("bash stop-dynamodb-local.sh").!
32+
println("Stop fakes3")
33+
Process("bash stop-fakes3.sh").!
2934
}

integration/src/it/scala/dynamodb/AwaitHelper.scala integration/src/it/scala/AwaitHelper.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
* limitations under the License.
1616
*/
1717

18-
package com.github.dwhjames.awswrap.dynamodb
18+
package com.github.dwhjames.awswrap
1919

2020
import scala.concurrent._
21-
import scala.concurrent.ExecutionContext.Implicits.global
2221
import scala.concurrent.duration._
2322

2423
trait AwaitHelper {

integration/src/it/scala/dynamodb/DynamoDBClient.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
* limitations under the License.
1616
*/
1717

18-
package com.github.dwhjames.awswrap.dynamodb
18+
package com.github.dwhjames.awswrap
19+
package dynamodb
1920

2021
import scala.concurrent.ExecutionContext.Implicits.global
2122
import scala.concurrent.duration._
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2015 Daniel W. H. James
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.github.dwhjames.awswrap
18+
package s3
19+
20+
import scala.concurrent.ExecutionContext.Implicits.global
21+
22+
import java.io.File
23+
24+
import org.scalatest.{ FlatSpec, Matchers }
25+
26+
import com.amazonaws.AmazonClientException
27+
28+
29+
class FutureTransferSpec
30+
extends FlatSpec
31+
with Matchers
32+
with S3ClientHelper
33+
{
34+
35+
val bucketName = "my-s3-bucket-98bfdf06-9475-4d1a-a235-e0eddd5859f9"
36+
override val bucketNames = Seq(bucketName)
37+
val objectKey = "test"
38+
39+
override def afterAll(): Unit = {
40+
try {
41+
client.client.deleteObject(bucketName, objectKey)
42+
} finally {
43+
super.afterAll()
44+
}
45+
}
46+
47+
"FutureTransfer" should "upload a file" in {
48+
val file = new File(
49+
this.getClass()
50+
.getClassLoader()
51+
.getResource("logback-test.xml")
52+
.toURI())
53+
54+
val upload = transferManager.upload(bucketName, objectKey, file)
55+
56+
await {
57+
FutureTransfer.listenFor(upload)
58+
}
59+
upload.waitForUploadResult()
60+
()
61+
}
62+
63+
it should "download a file" in {
64+
val file = File.createTempFile("logback-test", ".xml")
65+
66+
try {
67+
val download = transferManager.download(bucketName, objectKey, file)
68+
69+
await {
70+
FutureTransfer.listenFor(download)
71+
}
72+
download.waitForCompletion()
73+
()
74+
} finally {
75+
file.delete()
76+
()
77+
}
78+
}
79+
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2015 Daniel W. H. James
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.github.dwhjames.awswrap
18+
package s3
19+
20+
import org.scalatest.{Suite, BeforeAndAfterAll}
21+
22+
import com.amazonaws.auth.BasicAWSCredentials
23+
import com.amazonaws.services.s3._
24+
import com.amazonaws.services.s3.transfer._
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
30+
trait S3ClientHelper
31+
extends BeforeAndAfterAll
32+
with AwaitHelper
33+
{ self: Suite =>
34+
35+
private val logger: Logger = LoggerFactory.getLogger(self.getClass)
36+
37+
val client = {
38+
val c = new AmazonS3ScalaClient(new BasicAWSCredentials("FAKE_ACCESS_KEY", "FAKE_SECRET_KEY"))
39+
c.client.setEndpoint("http://localhost:4000")
40+
c.client.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true))
41+
c
42+
}
43+
44+
val transferManager = new TransferManager(client.client)
45+
46+
val bucketNames: Seq[String]
47+
48+
override def beforeAll(): Unit = {
49+
bucketNames foreach { name =>
50+
logger.info(s"Creating bucket $name")
51+
client.client.createBucket(name)
52+
}
53+
54+
super.beforeAll()
55+
}
56+
57+
override def afterAll(): Unit = {
58+
try {
59+
super.afterAll()
60+
bucketNames foreach { name =>
61+
logger.info(s"Deleting bucket $name")
62+
client.client.deleteBucket(name)
63+
}
64+
} finally {
65+
transferManager.shutdownNow()
66+
client.shutdown()
67+
}
68+
}
69+
}

src/main/scala/s3/s3.scala

+54-10
Original file line numberDiff line numberDiff line change
@@ -508,8 +508,8 @@ class AmazonS3ScalaClient(
508508
* interface, and [[FutureTransfer.listenFor]] adapts this interface to
509509
* Scala futures.
510510
*
511-
* @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/transfer/TransferManager.html TransferManager]]
512-
* @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/transfer/Transfer.html Transfer]]
511+
* @see [[com.amazonaws.services.s3.transfer.TransferManager TransferManager]]
512+
* @see [[com.amazonaws.services.s3.transfer.Transfer Transfer]]
513513
*/
514514
object FutureTransfer {
515515

@@ -518,7 +518,7 @@ object FutureTransfer {
518518
/**
519519
* Attach a listener to an S3 Transfer and return it as a Future.
520520
*
521-
* This helper method attaches a progress listener to the given
521+
* This helper method attaches a progress and state change listeners to the given
522522
* Transfer object. The returned future is completed with the
523523
* same transfer when the transfer is ‘done’ (canceled, completed,
524524
* or failed). The future will always been completed successfully
@@ -536,14 +536,33 @@ object FutureTransfer {
536536
* @param transfer
537537
* an S3 Transfer to listen for progress.
538538
* @return the transfer in a future.
539-
* @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/transfer/Transfer.html Transfer]]
540-
* @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/event/ProgressListener.html ProgressListener]]
539+
* @see [[com.amazonaws.services.s3.transfer.Transfer Transfer]]
540+
* @see [[com.amazonaws.event.ProgressListener ProgressListener]]
541541
*/
542542
def listenFor[T <: Transfer](transfer: T): Future[transfer.type] = {
543+
import com.amazonaws.services.s3.transfer.internal.{ AbstractTransfer, TransferStateChangeListener }
543544
val transferDescription = transfer.getDescription
544545
def debugLog(eventType: String): Unit = {
545546
logger.debug(s"$eventType : $transferDescription")
546547
}
548+
def logTransferState(state: Transfer.TransferState): Unit = {
549+
if (logger.isDebugEnabled) {
550+
state match {
551+
case Transfer.TransferState.Waiting =>
552+
debugLog("Waiting")
553+
case Transfer.TransferState.InProgress =>
554+
debugLog("InProgress")
555+
case Transfer.TransferState.Completed =>
556+
debugLog("Completed")
557+
case Transfer.TransferState.Canceled =>
558+
debugLog("Canceled")
559+
case Transfer.TransferState.Failed =>
560+
debugLog("Failed")
561+
case _ =>
562+
logger.warn(s"unrecognized transfer state for transfer $transferDescription")
563+
}
564+
}
565+
}
547566
def logProgressEvent(progressEvent: ProgressEvent): Unit = {
548567
if (logger.isDebugEnabled) {
549568
progressEvent.getEventType match {
@@ -601,6 +620,35 @@ object FutureTransfer {
601620

602621
val p = Promise[transfer.type]
603622

623+
if (transfer.isInstanceOf[AbstractTransfer]) {
624+
/* Attach a state change listener to the transfer.
625+
* At this point, the transfer is already in progress
626+
* and may even have already completed. We will have
627+
* missed any state change events that have already been
628+
* fired, including the completion event!
629+
*/
630+
transfer.asInstanceOf[AbstractTransfer].addStateChangeListener(new TransferStateChangeListener {
631+
/* Note that the transferStateChanged will be called in the Java SDK’s
632+
* special thread for callbacks, so any blocking calls here have
633+
* the potential to induce deadlock.
634+
*/
635+
override def transferStateChanged(t: Transfer, state: Transfer.TransferState): Unit = {
636+
logTransferState(state)
637+
638+
if (state == Transfer.TransferState.Completed ||
639+
state == Transfer.TransferState.Canceled ||
640+
state == Transfer.TransferState.Failed) {
641+
val success = p trySuccess transfer
642+
if (logger.isDebugEnabled) {
643+
if (success) {
644+
logger.debug(s"promise successfully completed from transfer state change listener for $transferDescription")
645+
}
646+
}
647+
}
648+
}
649+
})
650+
}
651+
604652
/* Attach a progress listener to the transfer.
605653
* At this point, the transfer is already in progress
606654
* and may even have already completed. We will have
@@ -624,8 +672,6 @@ object FutureTransfer {
624672
if (logger.isDebugEnabled) {
625673
if (success) {
626674
logger.debug(s"promise successfully completed from progress listener for $transferDescription")
627-
} else {
628-
logger.debug(s"promise was found to be already completed from progress listener for $transferDescription")
629675
}
630676
}
631677
}
@@ -639,9 +685,7 @@ object FutureTransfer {
639685
val success = p trySuccess transfer
640686
if (logger.isDebugEnabled) {
641687
if (success) {
642-
logger.debug(s"promise successfully completed outside of progress listener for $transferDescription")
643-
} else {
644-
logger.debug(s"promise was found to be already completed outside of progress listener for $transferDescription")
688+
logger.debug(s"promise successfully completed from outside of callbacks for $transferDescription")
645689
}
646690
}
647691
}

start-dynamodb-local.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
set -euo pipefail
55
IFS=$'\n\t'
66

7-
WORKING_DIR="dynamodb"
7+
WORKING_DIR="dynamodb-local"
88

99
mkdir -p $WORKING_DIR
1010
cd $WORKING_DIR
@@ -26,7 +26,7 @@ mkdir -p $LOG_DIR
2626
echo "DynamoDB Local output will save to ${WORKING_DIR}/${LOG_DIR}/"
2727

2828
NOW=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
29-
nohup java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -port 8000 -inMemory 1>"${LOG_DIR}/${NOW}.out.log" 2>"${LOG_DIR}/${NOW}.err.log" &
29+
exec java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -port 8000 -inMemory 1>"${LOG_DIR}/${NOW}.out.log" 2>"${LOG_DIR}/${NOW}.err.log" &
3030
PID=$!
3131

3232
echo "DynamoDB Local started with pid ${PID}"

start-fakes3.sh

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/usr/bin/env bash
2+
3+
# http://redsymbol.net/articles/unofficial-bash-strict-mode/
4+
set -euo pipefail
5+
IFS=$'\n\t'
6+
7+
bundle check
8+
9+
WORKING_DIR="fakes3"
10+
DATA_DIR="${WORKING_DIR}/data"
11+
LOG_DIR="${WORKING_DIR}/logs"
12+
mkdir -p "$DATA_DIR" "$LOG_DIR"
13+
14+
PORT="4000"
15+
NOW=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
16+
exec bundle exec fakes3 -r "$DATA_DIR" -p "$PORT" 1>"${LOG_DIR}/${NOW}.out.log" 2>"${LOG_DIR}/${NOW}.err.log" &
17+
PID=$!
18+
19+
echo "fakes3 started with pid ${PID}"
20+
echo $PID >"${WORKING_DIR}/PID"
21+
22+
echo "Pausing for 2 seconds..."
23+
sleep 2

stop-dynamodb-local.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/bash
22

3-
WORKING_DIR="dynamodb"
3+
WORKING_DIR="dynamodb-local"
44

55
PID_FILE="${WORKING_DIR}/PID"
66

@@ -10,7 +10,7 @@ then
1010
exit 1
1111
else
1212
PID=$(cat $PID_FILE)
13-
kill -s KILL $PID
13+
kill -s TERM $PID
1414
rm $PID_FILE
1515
echo "DynamoDB Local stopped"
1616
fi

0 commit comments

Comments
 (0)