Skip to content

Commit ecd0c15

Browse files
authored
Issue #205: toRedisByteLIST() (#216)
* issue #205: a prototype of toRedisByteLIST() function * issue #205: added doc
1 parent 6a19452 commit ecd0c15

File tree

4 files changed

+98
-4
lines changed

4 files changed

+98
-4
lines changed

doc/rdd.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,15 @@ sc.toRedisFixedLIST(listRDD, listName, listSize)
137137
The `listRDD` is an RDD that contains all of the list's string elements in order, and `listName` is the list's key name.
138138
`listSize` is an integer which specifies the size of the Redis list; it is optional, and will default to an unlimited size.
139139

140+
Use the following to store an RDD of binary values in a Redis List:
141+
142+
```scala
143+
sc.toRedisByteLIST(byteListRDD)
144+
```
145+
146+
The `byteListRDD` is an RDD of tuples (`list name`, `list values`) represented as byte arrays.
147+
148+
140149
#### Sets
141150
For storing data in a Redis Set, use `toRedisSET` as follows:
142151

src/main/scala/com/redislabs/provider/redis/RedisConfig.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,25 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
166166
}
167167

168168
/**
169-
* @param key
170169
* *IMPORTANT* Please remember to close after using
171-
* @return jedis who is a connection for a given key
170+
*
171+
* @param key
172+
* @return jedis that is a connection for a given key
172173
*/
173174
def connectionForKey(key: String): Jedis = {
174175
getHost(key).connect()
175176
}
176177

178+
/**
179+
* *IMPORTANT* Please remember to close after using
180+
*
181+
* @param key
182+
* @return jedis is a connection for a given key
183+
*/
184+
def connectionForKey(key: Array[Byte]): Jedis = {
185+
getHost(key).connect()
186+
}
187+
177188
/**
178189
* @param initialHost any redis endpoint of a cluster or a single server
179190
* @return true if the target server is in cluster mode
@@ -195,9 +206,22 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
195206
*/
196207
def getHost(key: String): RedisNode = {
197208
val slot = JedisClusterCRC16.getSlot(key)
198-
hosts.filter(host => {
209+
getHostBySlot(slot)
210+
}
211+
212+
/**
213+
* @param key
214+
* @return host whose slots should involve key
215+
*/
216+
def getHost(key: Array[Byte]): RedisNode = {
217+
val slot = JedisClusterCRC16.getSlot(key)
218+
getHostBySlot(slot)
219+
}
220+
221+
private def getHostBySlot(slot: Int): RedisNode = {
222+
hosts.filter { host =>
199223
host.startSlot <= slot && host.endSlot >= slot
200-
})(0)
224+
}(0)
201225
}
202226

203227

src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.redislabs.provider.redis
22

33
import com.redislabs.provider.redis.rdd._
4+
import com.redislabs.provider.redis.util.ConnectionUtils.withConnection
45
import com.redislabs.provider.redis.util.PipelineUtils._
56
import org.apache.spark.SparkContext
67
import org.apache.spark.rdd.RDD
@@ -299,6 +300,19 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
299300
vs.foreachPartition(partition => setList(listName, partition, ttl, redisConfig, readWriteConfig))
300301
}
301302

303+
/**
304+
* Write RDD of binary values to Redis List.
305+
*
306+
* @param rdd RDD of tuples (list name, list values)
307+
* @param ttl time to live
308+
*/
309+
def toRedisByteLIST(rdd: RDD[(Array[Byte], Seq[Array[Byte]])], ttl: Int = 0)
310+
(implicit
311+
redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
312+
readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
313+
rdd.foreachPartition(partition => setList(partition, ttl, redisConfig, readWriteConfig))
314+
}
315+
302316
/**
303317
* @param vs RDD of values
304318
* @param listName target list's name which hold all the vs
@@ -415,6 +429,30 @@ object RedisContext extends Serializable {
415429
conn.close()
416430
}
417431

432+
433+
def setList(keyValues: Iterator[(Array[Byte], Seq[Array[Byte]])],
434+
ttl: Int,
435+
redisConfig: RedisConfig,
436+
readWriteConfig: ReadWriteConfig) {
437+
implicit val rwConf: ReadWriteConfig = readWriteConfig
438+
439+
keyValues
440+
.map { case (key, listValues) =>
441+
(redisConfig.getHost(key), (key, listValues))
442+
}
443+
.toArray
444+
.groupBy(_._1)
445+
.foreach { case (node, arr) =>
446+
withConnection(node.endpoint.connect()) { conn =>
447+
foreachWithPipeline(conn, arr) { (pipeline, a) =>
448+
val (key, listVals) = a._2
449+
pipeline.rpush(key, listVals: _*)
450+
if (ttl > 0) pipeline.expire(key, ttl)
451+
}
452+
}
453+
}
454+
}
455+
418456
/**
419457
* @param key
420458
* @param listSize

src/test/scala/com/redislabs/provider/redis/rdd/RedisRddSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.redislabs.provider.redis.rdd
22

3+
import com.redislabs.provider.redis.util.ConnectionUtils.withConnection
34
import com.redislabs.provider.redis.{RedisConfig, SparkRedisSuite, toRedisContext}
45
import org.scalatest.Matchers
6+
import scala.collection.JavaConverters._
57

68
import scala.io.Source.fromInputStream
79

@@ -109,6 +111,27 @@ trait RedisRddSuite extends SparkRedisSuite with Keys with Matchers {
109111
setContents should be(ws)
110112
}
111113

114+
test("toRedisLIST, byte array") {
115+
val list1 = Seq("a1", "b1", "c1")
116+
val list2 = Seq("a2", "b2", "c2")
117+
val keyValues = Seq(
118+
("list1", list1),
119+
("list2", list2)
120+
)
121+
val keyValueBytes = keyValues.map {case (k, list) => (k.getBytes, list.map(_.getBytes())) }
122+
val rdd = sc.parallelize(keyValueBytes)
123+
sc.toRedisByteLIST(rdd)
124+
125+
def verify(list: String, vals: Seq[String]): Unit = {
126+
withConnection(redisConfig.getHost(list).endpoint.connect()) { conn =>
127+
conn.lrange(list, 0, vals.size).asScala should be(vals.toList)
128+
}
129+
}
130+
131+
verify("list1", list1)
132+
verify("list2", list2)
133+
}
134+
112135
test("Expire") {
113136
val expireTime = 1
114137
val prefix = s"#expire in $expireTime#:"

0 commit comments

Comments
 (0)