Skip to content

Commit d0386e1

Browse files
committed
Merge pull request #41 from etaty/feature/scan-progressively
progressive scan
2 parents c28145f + 1102797 commit d0386e1

File tree

2 files changed

+65
-29
lines changed

2 files changed

+65
-29
lines changed

integration/src/it/scala/dynamodb/ReadsOnHashKeyTablesSpec.scala

+18
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,31 @@ class ReadsOnHashKeyTableSpec
8585
val forumScanOnceLimit = await {
8686
mapper.scanOnce[Forum](limit = sampleForums.size)
8787
}
88+
89+
val forumProgressively = await {
90+
mapper.scanProgressively[Forum](limit = sampleForums.size).map(_._2)
91+
}
92+
93+
val forumProgressivelyLimit = await {
94+
for {
95+
(lastEvaluatedKey, head) <- mapper.scanProgressively[Forum](limit = 1)
96+
(lastEvaluatedKeyNone, next) <- mapper.scanProgressively[Forum](lastEvaluatedKey = lastEvaluatedKey)
97+
} yield {
98+
lastEvaluatedKey should not be empty
99+
lastEvaluatedKeyNone shouldEqual None
100+
head ++ next
101+
}
102+
}
103+
88104
val forumBatch = await {
89105
mapper.batchLoadByKeys[Forum](sampleForums map (_.name))
90106
}
91107

92108
forumScan should have size (sampleForums.size.toLong)
93109
forumScanOnce should have size (sampleForums.size.toLong)
94110
forumScanOnceLimit should have size (sampleForums.size.toLong)
111+
forumProgressively should have size (sampleForums.size.toLong)
112+
forumProgressivelyLimit should have size (sampleForums.size.toLong)
95113
forumBatch should have size (sampleForums.size.toLong)
96114

97115
for (forum <- sampleForums) {

src/main/scala/dynamodb/mapper.scala

+47-29
Original file line numberDiff line numberDiff line change
@@ -588,33 +588,19 @@ trait AmazonDynamoDBScalaMapper {
588588
def scan[T](
589589
scanFilter: Map[String, Condition] = Map.empty
590590
)(implicit serializer: DynamoDBSerializer[T]): Future[Seq[T]] = {
591-
val scanRequest =
592-
new ScanRequest()
593-
.withTableName(tableName)
594-
.withScanFilter(scanFilter.asJava)
595-
if (logger.isDebugEnabled)
596-
scanRequest.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
597-
598591
val builder = Seq.newBuilder[T]
599592

600-
def local(lastKey: Option[DynamoDBKey] = None): Future[Unit] =
601-
client.scan(
602-
scanRequest.withExclusiveStartKey(lastKey.orNull)
603-
) flatMap { result =>
604-
if (logger.isDebugEnabled)
605-
logger.debug(s"scan() ConsumedCapacity = ${result.getConsumedCapacity()}")
606-
607-
builder ++= result.getItems.asScala.view map { item =>
608-
serializer.fromAttributeMap(item.asScala)
609-
}
610-
611-
Option { result.getLastEvaluatedKey } match {
612-
case None => Future.successful(())
613-
case optKey => local(optKey)
614-
}
593+
def local(lastKey: Option[DynamoDBKey] = None): Future[Seq[T]] =
594+
scanProgressively(scanFilter, lastEvaluatedKey = lastKey).flatMap {
595+
case (key, result) =>
596+
builder ++= result
597+
key match {
598+
case None => Future.successful(builder.result())
599+
case optKey => local(optKey)
600+
}
615601
}
616602

617-
local() map { _ => builder.result }
603+
local()
618604
}
619605

620606
/**
@@ -634,27 +620,59 @@ trait AmazonDynamoDBScalaMapper {
634620
scanFilter: Map[String, Condition] = Map.empty,
635621
limit: Int = 0
636622
)(implicit serializer: DynamoDBSerializer[T]): Future[Seq[T]] = {
623+
scanProgressively(scanFilter, limit).map(_._2)
624+
}
625+
626+
/**
627+
* Scan a table.
628+
*
629+
* This method will issue one scan request, stopping either
630+
* at the supplied limit or at the response size limit.
631+
*
632+
* @param scanFilter
633+
* the optional filter conditions for the scan.
634+
* @param lastEvaluatedKey
635+
* the optional starting key.
636+
* @param limit
637+
* the optional limit for the number of items to return.
638+
* @return Tuple of
639+
* some last dynamoDB key or none if the "end" of the scan is reached
640+
* and sequence of scanned objects in a future.
641+
* @see [[countScan]]
642+
*/
643+
def scanProgressively[T](
644+
scanFilter: Map[String, Condition] = Map.empty,
645+
limit: Int = 0,
646+
lastEvaluatedKey: Option[DynamoDBKey] = None
647+
)(implicit serializer: DynamoDBSerializer[T]): Future[(Option[DynamoDBKey], Seq[T])] = {
637648
val scanRequest =
638649
new ScanRequest()
639-
.withTableName(tableName)
640-
.withScanFilter(scanFilter.asJava)
650+
.withTableName(tableName)
651+
.withScanFilter(scanFilter.asJava)
652+
641653
if (limit > 0)
642654
scanRequest.setLimit(limit)
655+
643656
if (logger.isDebugEnabled)
644657
scanRequest.setReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
645658

646-
client.scan(scanRequest) map { result =>
659+
client.scan(
660+
scanRequest.withExclusiveStartKey(lastEvaluatedKey.orNull)
661+
) map { result =>
647662
if (logger.isDebugEnabled)
648663
logger.debug(s"scanOnce() ConsumedCapacity = ${result.getConsumedCapacity()}")
649664

650-
result.getItems.asScala.view map { item =>
665+
val r = result.getItems.asScala map { item =>
651666
serializer.fromAttributeMap(item.asScala)
652667
}
668+
669+
val lastEvaluatedKey = Option {
670+
result.getLastEvaluatedKey
671+
}
672+
(lastEvaluatedKey, r)
653673
}
654674
}
655675

656-
657-
658676
/**
659677
* Scan a table and return a count.
660678
*

0 commit comments

Comments
 (0)