Skip to content
This repository was archived by the owner on Jan 13, 2025. It is now read-only.

Commit a13a5a5

Browse files
authored
selectStream performance (#189)
* made selectStream more efficient by batching multiple rows is a single zio and using chunks * drop scala 2.13 api use
1 parent 9da28a3 commit a13a5a5

File tree

2 files changed

+20
-12
lines changed

2 files changed

+20
-12
lines changed

core/src/main/scala/zio/jdbc/Query.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,27 @@ final case class Query[+A](sql: SqlFragment, decode: ZResultSet => A) {
5454
/**
5555
* Performs a SQL select query, returning a stream of results.
5656
*/
57-
def selectStream: ZStream[ZConnection, Throwable, A] =
57+
def selectStream(chunkSize: => Int = ZStream.DefaultChunkSize): ZStream[ZConnection, Throwable, A] =
5858
ZStream.unwrapScoped {
5959
for {
6060
zrs <- executeQuery(sql)
61-
stream = ZStream.repeatZIOOption {
62-
ZIO
63-
.suspend(if (zrs.next()) ZIO.attempt(Some(decode(zrs))) else ZIO.none)
64-
.mapError(Option(_))
65-
.flatMap {
66-
case None => ZIO.fail(None)
67-
case Some(v) => ZIO.succeed(v)
61+
stream = ZStream.paginateChunkZIO(())(_ =>
62+
ZIO.attemptBlocking {
63+
val builder = ChunkBuilder.make[A](chunkSize)
64+
var hasNext = false
65+
var i = 0
66+
while (
67+
i < chunkSize && {
68+
hasNext = zrs.next()
69+
hasNext
70+
}
71+
) {
72+
builder.addOne(decode(zrs))
73+
i += 1
6874
}
69-
}
75+
(builder.result(), if (hasNext) Some(()) else None)
76+
}
77+
)
7078
} yield stream
7179
}
7280

core/src/test/scala/zio/jdbc/ZConnectionPoolSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,11 +329,11 @@ object ZConnectionPoolSpec extends ZIOSpecDefault {
329329
} +
330330
test("select stream") {
331331
for {
332-
_ <- createUsers *> insertSherlock *> insertWatson
332+
_ <- createUsersNoId *> insertFive
333333
value <- transaction {
334-
sql"select name, age from users".query[User].selectStream.runCollect
334+
sql"select name, age from users_no_id".query[UserNoId].selectStream(2).chunks.runCollect
335335
}
336-
} yield assertTrue(value == Chunk(sherlockHolmes, johnWatson))
336+
} yield assertTrue(value == Chunk(Chunk(user1, user2), Chunk(user3, user4), Chunk(user5)))
337337
} +
338338
test("delete") {
339339
for {

0 commit comments

Comments
 (0)