Skip to content

Commit 1c2ad0e

Browse files
authoredJun 24, 2020
Merge pull request #17 from dengliming/f_cf_commands
Add Support for Cuckoo Filter Commands
2 parents 9bd2296 + 4dbfda2 commit 1c2ad0e

File tree

3 files changed

+237
-13
lines changed

3 files changed

+237
-13
lines changed
 

‎README.md

+11-11
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,17 @@ func main() {
7373

7474
| Command | Recommended API and godoc |
7575
| :--- | ----: |
76-
| [CF.RESERVE](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfreserve) | N/A |
77-
| [CF.ADD](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfadd) | N/A |
78-
| [CF.ADDNX](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfaddnx) | N/A |
79-
| [CF.INSERT](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfinsert) | N/A |
80-
| [CF.INSERTNX](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfinsertnx) | N/A |
81-
| [CF.EXISTS](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfexists) | N/A |
82-
| [CF.DEL](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfdel) | N/A |
83-
| [CF.COUNT](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfcount) | N/A |
84-
| [CF.SCANDUMP](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfscandump) | N/A |
85-
| [CF.LOADCHUNK](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfloadchunck) | N/A |
86-
| [CF.INFO](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfinfo) | N/A |
76+
| [CF.RESERVE](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfreserve) | [CfReserve](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfReserve) |
77+
| [CF.ADD](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfadd) | [CfAdd](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfAdd) |
78+
| [CF.ADDNX](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfaddnx) | [CfAddNx](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfAddNx) |
79+
| [CF.INSERT](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfinsert) | [CfInsert](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfInsert) |
80+
| [CF.INSERTNX](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfinsertnx) | [CfInsertNx](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfInsertNx) |
81+
| [CF.EXISTS](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfexists) | [CfExists](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfExists) |
82+
| [CF.DEL](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfdel) | [CfDel](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfDel) |
83+
| [CF.COUNT](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfcount) | [CfCount](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfCount) |
84+
| [CF.SCANDUMP](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfscandump) | [CfScanDump](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfScanDump) |
85+
| [CF.LOADCHUNK](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfloadchunck) | [CfLoadChunk](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfLoadChunk) |
86+
| [CF.INFO](https://oss.redislabs.com/redisbloom/Cuckoo_Commands/#cfinfo) | [CfInfo](https://godoc.org/github.com/RedisBloom/redisbloom-go#Client.CfInfo) |
8787

8888
### Count-Min Sketch
8989

‎client.go

+112-2
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,119 @@ func (client *Client) CmsMerge(key string, srcs []string, weights []string) (str
269269
func (client *Client) CmsInfo(key string) (map[string]int64, error) {
270270
conn := client.Pool.Get()
271271
defer conn.Close()
272-
reply, err := conn.Do("CMS.INFO", key)
272+
return ParseInfoReply(redis.Values(conn.Do("CMS.INFO", key)))
273+
}
273274

274-
values, err := redis.Values(reply, err)
275+
// Create an empty cuckoo filter with an initial capacity of {capacity} items.
276+
func (client *Client) CfReserve(key string, capacity int64, bucketSize int64, maxIterations int64, expansion int64) (string, error) {
277+
conn := client.Pool.Get()
278+
defer conn.Close()
279+
args := redis.Args{key}.Add(capacity)
280+
if bucketSize > 0 {
281+
args = args.Add("BUCKETSIZE", bucketSize)
282+
}
283+
if maxIterations > 0 {
284+
args = args.Add("MAXITERATIONS", maxIterations)
285+
}
286+
if expansion > 0 {
287+
args = args.Add("EXPANSION", expansion)
288+
}
289+
return redis.String(conn.Do("CF.RESERVE", args...))
290+
}
291+
292+
// Adds an item to the cuckoo filter, creating the filter if it does not exist.
293+
func (client *Client) CfAdd(key string, item string) (bool, error) {
294+
conn := client.Pool.Get()
295+
defer conn.Close()
296+
return redis.Bool(conn.Do("CF.ADD", key, item))
297+
}
298+
299+
// Adds an item to a cuckoo filter if the item did not exist previously.
300+
func (client *Client) CfAddNx(key string, item string) (bool, error) {
301+
conn := client.Pool.Get()
302+
defer conn.Close()
303+
return redis.Bool(conn.Do("CF.ADDNX", key, item))
304+
}
305+
306+
// Adds one or more items to a cuckoo filter, allowing the filter to be created with a custom capacity if it does not yet exist.
307+
func (client *Client) CfInsert(key string, cap int64, noCreate bool, items []string) ([]int64, error) {
308+
conn := client.Pool.Get()
309+
defer conn.Close()
310+
args := GetInsertArgs(key, cap, noCreate, items)
311+
return redis.Int64s(conn.Do("CF.INSERT", args...))
312+
}
313+
314+
// Adds one or more items to a cuckoo filter, allowing the filter to be created with a custom capacity if it does not yet exist.
315+
func (client *Client) CfInsertNx(key string, cap int64, noCreate bool, items []string) ([]int64, error) {
316+
conn := client.Pool.Get()
317+
defer conn.Close()
318+
args := GetInsertArgs(key, cap, noCreate, items)
319+
return redis.Int64s(conn.Do("CF.INSERTNX", args...))
320+
}
321+
322+
func GetInsertArgs(key string, cap int64, noCreate bool, items []string) redis.Args {
323+
args := redis.Args{key}
324+
if cap > 0 {
325+
args = args.Add("CAPACITY", cap)
326+
}
327+
if noCreate {
328+
args = args.Add("NOCREATE")
329+
}
330+
args = args.Add("ITEMS").AddFlat(items)
331+
return args
332+
}
333+
334+
// Check if an item exists in a Cuckoo Filter
335+
func (client *Client) CfExists(key string, item string) (bool, error) {
336+
conn := client.Pool.Get()
337+
defer conn.Close()
338+
return redis.Bool(conn.Do("CF.EXISTS", key, item))
339+
}
340+
341+
// Deletes an item once from the filter.
342+
func (client *Client) CfDel(key string, item string) (bool, error) {
343+
conn := client.Pool.Get()
344+
defer conn.Close()
345+
return redis.Bool(conn.Do("CF.DEL", key, item))
346+
}
347+
348+
// Returns the number of times an item may be in the filter.
349+
func (client *Client) CfCount(key string, item string) (int64, error) {
350+
conn := client.Pool.Get()
351+
defer conn.Close()
352+
return redis.Int64(conn.Do("CF.COUNT", key, item))
353+
}
354+
355+
// Begins an incremental save of the cuckoo filter.
356+
func (client *Client) CfScanDump(key string, iter int64) (int64, []byte, error) {
357+
conn := client.Pool.Get()
358+
defer conn.Close()
359+
reply, err := redis.Values(conn.Do("CF.SCANDUMP", key, iter))
360+
if err != nil || len(reply) != 2 {
361+
return 0, nil, err
362+
}
363+
iter = reply[0].(int64)
364+
if reply[1] == nil {
365+
return iter, nil, err
366+
}
367+
return iter, reply[1].([]byte), err
368+
}
369+
370+
// Restores a filter previously saved using SCANDUMP
371+
func (client *Client) CfLoadChunk(key string, iter int64, data []byte) (string, error) {
372+
conn := client.Pool.Get()
373+
defer conn.Close()
374+
return redis.String(conn.Do("CF.LOADCHUNK", key, iter, data))
375+
}
376+
377+
// Return information about key
378+
func (client *Client) CfInfo(key string) (map[string]int64, error) {
379+
conn := client.Pool.Get()
380+
defer conn.Close()
381+
return ParseInfoReply(redis.Values(conn.Do("CF.INFO", key)))
382+
}
383+
384+
func ParseInfoReply(values []interface{}, err error) (map[string]int64, error) {
275385
if err != nil {
276386
return nil, err
277387
}

‎client_test.go

+114
Original file line numberDiff line numberDiff line change
@@ -280,3 +280,117 @@ func TestClient_CmsInfo(t *testing.T) {
280280
assert.Equal(t, int64(5), info["depth"])
281281
assert.Equal(t, int64(0), info["count"])
282282
}
283+
284+
func TestClient_CfReserve(t *testing.T) {
285+
client.FlushAll()
286+
key := "test_cf_reserve"
287+
ret, err := client.CfReserve(key, 1000, -1, -1, -1)
288+
assert.Nil(t, err)
289+
assert.Equal(t, "OK", ret)
290+
}
291+
292+
func TestClient_CfAdd(t *testing.T) {
293+
client.FlushAll()
294+
key := "test_cf_add"
295+
ret, err := client.CfAdd(key, "a")
296+
assert.Nil(t, err)
297+
assert.True(t, ret)
298+
ret, err = client.CfAddNx(key, "b")
299+
assert.Nil(t, err)
300+
assert.True(t, ret)
301+
}
302+
303+
func TestClient_CfInsert(t *testing.T) {
304+
client.FlushAll()
305+
key := "test_cf_insert"
306+
ret, err := client.CfInsert(key, 1000, false, []string{"a"})
307+
assert.Nil(t, err)
308+
assert.Equal(t, 1, len(ret))
309+
assert.True(t, ret[0] > 0)
310+
ret, err = client.CfInsertNx(key, 1000, true, []string{"b"})
311+
assert.Nil(t, err)
312+
assert.Equal(t, 1, len(ret))
313+
assert.True(t, ret[0] > 0)
314+
}
315+
316+
func TestClient_CfExists(t *testing.T) {
317+
client.FlushAll()
318+
key := "test_cf_exists"
319+
ret, err := client.CfAdd(key, "a")
320+
assert.Nil(t, err)
321+
assert.True(t, ret)
322+
ret, err = client.CfExists(key, "a")
323+
assert.Nil(t, err)
324+
assert.True(t, ret)
325+
}
326+
327+
func TestClient_CfDel(t *testing.T) {
328+
client.FlushAll()
329+
key := "test_cf_del"
330+
ret, err := client.CfAdd(key, "a")
331+
assert.Nil(t, err)
332+
assert.True(t, ret)
333+
ret, err = client.CfExists(key, "a")
334+
assert.Nil(t, err)
335+
assert.True(t, ret)
336+
ret, err = client.CfDel(key, "a")
337+
assert.Nil(t, err)
338+
assert.True(t, ret)
339+
ret, err = client.CfExists(key, "a")
340+
assert.Nil(t, err)
341+
assert.False(t, ret)
342+
}
343+
344+
func TestClient_CfCount(t *testing.T) {
345+
client.FlushAll()
346+
key := "test_cf_count"
347+
ret, err := client.CfAdd(key, "a")
348+
assert.Nil(t, err)
349+
assert.True(t, ret)
350+
count, err := client.CfCount(key, "a")
351+
assert.Nil(t, err)
352+
assert.Equal(t, int64(1), count)
353+
}
354+
355+
func TestClient_CfScanDump(t *testing.T) {
356+
client.FlushAll()
357+
key := "test_cf_scandump"
358+
ret, err := client.CfReserve(key, 100, 50, -1, -1)
359+
assert.Nil(t, err)
360+
assert.Equal(t, "OK", ret)
361+
client.CfAdd(key, "a")
362+
curIter := int64(0)
363+
chunks := make([]map[string]interface{}, 0)
364+
for {
365+
iter, data, err := client.CfScanDump(key, curIter)
366+
assert.Nil(t, err)
367+
curIter = iter
368+
if iter == int64(0) {
369+
break
370+
}
371+
chunk := map[string]interface{}{"iter": iter, "data": data}
372+
chunks = append(chunks, chunk)
373+
}
374+
client.FlushAll()
375+
for i := 0; i < len(chunks); i++ {
376+
ret, err := client.CfLoadChunk(key, chunks[i]["iter"].(int64), chunks[i]["data"].([]byte))
377+
assert.Nil(t, err)
378+
assert.Equal(t, "OK", ret)
379+
}
380+
exists, err := client.CfExists(key, "a")
381+
assert.True(t, exists)
382+
}
383+
384+
func TestClient_CfInfo(t *testing.T) {
385+
client.FlushAll()
386+
key := "test_cf_info"
387+
ret, err := client.CfAdd(key, "a")
388+
assert.Nil(t, err)
389+
assert.True(t, ret)
390+
info, err := client.CfInfo(key)
391+
assert.Equal(t, int64(1080), info["Size"])
392+
assert.Equal(t, int64(512), info["Number of buckets"])
393+
assert.Equal(t, int64(0), info["Number of filter"])
394+
assert.Equal(t, int64(1), info["Number of items inserted"])
395+
assert.Equal(t, int64(0), info["Max iteration"])
396+
}

0 commit comments

Comments
 (0)
Please sign in to comment.