Skip to content

Commit

Permalink
Add merge functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewdalpino committed Nov 4, 2024
1 parent 8ecb28f commit 3a6e837
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 16 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ True

## References
- [1] A. DalPino. (2021). OkBloomer, a novel autoscaling Bloom Filter [[link](https://github.com/andrewdalpino/OkBloomer)].
- [2] K. Christensen, et al. A New Analysis of the False-Positive Rate of a Bloom Filter.
74 changes: 59 additions & 15 deletions src/okbloomer/bloom_filter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from math import log

import numpy as np
import mmh3

Expand Down Expand Up @@ -51,11 +49,17 @@ def __init__(
f"Slice size must be less than {self.MAX_SLICE_SIZE}, {slice_size} given."
)

max_bits_per_layer = round(
layer_size * max_false_positive_rate ** (1 / num_hashes)
)

self._max_false_positive_rate = max_false_positive_rate
self._num_hashes = num_hashes
self._layer_size = layer_size
self._slice_size = slice_size
self._layers: List[NDArray] = []
self._max_bits = 0
self._max_bits_per_layer = max_bits_per_layer
self._n = 0
self._m = 0

Expand All @@ -73,10 +77,6 @@ def num_hashes(self) -> int:
def layer_size(self) -> int:
return self._layer_size

@property
def slice_size(self) -> int:
return self._slice_size

@property
def layers(self) -> List[NDArray]:
return self._layers
Expand Down Expand Up @@ -112,32 +112,34 @@ def false_positive_rate(self) -> float:

def insert(self, token: str) -> None:
"""Insert a token into the filter"""

offsets = self._hash(token)

layer = self._layers[-1]

changed = False

for offset in offsets:
if layer[offset] == False:
if not layer[offset]:
layer[offset] = True

self._n += 1

changed = True

if changed and self.false_positive_rate > self._max_false_positive_rate:
if changed and self._n >= self._max_bits:
self._add_layer()

def exists(self, token: str) -> bool:
"""Does the given token exist within the filter?"""

offsets = self._hash(token)

for layer in self._layers:
hits = 0

for offset in offsets:
if layer[offset] == False:
if not layer[offset]:
break

hits += 1
Expand All @@ -149,13 +151,14 @@ def exists(self, token: str) -> bool:

def exists_or_insert(self, token: str) -> bool:
"""Does the token exist in the filter? If not, then insert it."""

offsets = self._hash(token)

for layer in self._layers[:-1]:
hits = 0

for offset in offsets:
if layer[offset] == False:
if not layer[offset]:
break

hits += 1
Expand All @@ -168,32 +171,73 @@ def exists_or_insert(self, token: str) -> bool:
exists = True

for offset in offsets:
if layer[offset] == False:
if not layer[offset]:
layer[offset] = True

self._n += 1

exists = False

if not exists and self.false_positive_rate > self._max_false_positive_rate:
if not exists and self._n >= self._max_bits:
self._add_layer()

return exists

def merge(self, filter: Self) -> None:
"""Merge this filter with another filter."""

if self._num_hashes != filter.num_hashes:
raise ValueError("Filters must have the same number of hash functions.")

if self._layer_size != filter.layer_size:
raise ValueError("Filters must have the same layer size.")

a, b = self._layers.pop(), filter.layers.pop()

layers = self._layers + filter.layers

a_num_bits, b_num_bits = np.sum(a), np.sum(b)

can_combine_heads = a_num_bits + b_num_bits <= self._max_bits_per_layer

if can_combine_heads:
layers.append(np.bitwise_or(a, b))

self._n += filter.n

else:
if a_num_bits < b_num_bits:
a, b = b, a

layers.extend([a, b])

self._n += filter.num_layers * self._max_bits_per_layer

num_layers = len(layers)

self._layers = layers
self._m = num_layers * self.layer_size
self._max_bits = num_layers * self._max_bits_per_layer

def _add_layer(self) -> None:
"""Add another layer to the filter for maintaining the false positivity rate below the threshold."""
self._layers.append(np.zeros(self.layer_size, dtype="bool"))

self._m += self.layer_size
layer = np.zeros(self._layer_size, dtype="bool")

self._layers.append(layer)

self._m += self._layer_size
self._max_bits += self._max_bits_per_layer

def _hash(self, token: str) -> list:
"""Return a list of filter offsets from a given token."""

offsets = []

for i in range(1, self._num_hashes + 1):
offset = mmh3.hash(token, seed=i, signed=False)

offset %= self.slice_size
offset %= self._slice_size
offset *= i

offsets.append(int(offset))
Expand Down
40 changes: 39 additions & 1 deletion tests/test_bloom_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def test_instantiate(self):
self.assertEqual(0.001, filter.max_false_positive_rate)
self.assertEqual(16, filter.num_hashes)
self.assertEqual(64000, filter.layer_size)
self.assertEqual(4000, filter.slice_size)

def test_insert_and_exists(self):
filter = BloomFilter()
Expand Down Expand Up @@ -83,3 +82,42 @@ def test_autoscaling(self):
self.assertLessEqual(filter.false_positive_rate, 0.001)
self.assertLessEqual(filter.utilization, 1.0)
self.assertGreater(filter.capacity, 0.0)

def test_merge(self):
a = BloomFilter(max_false_positive_rate=0.001, layer_size=320000)

b = BloomFilter(max_false_positive_rate=0.001, layer_size=320000)

for filter in [a, b]:
for i in range(0, 20000):
filter.insert(
"".join(random.choice(string.ascii_letters) for j in range(20))
)

a.insert("foo")
a.insert("bar")

b.insert("baz")
b.insert("qux")

self.assertTrue(a.exists("foo"))
self.assertTrue(a.exists("bar"))
self.assertFalse(a.exists("baz"))
self.assertFalse(a.exists("qux"))

self.assertFalse(b.exists("foo"))
self.assertFalse(b.exists("bar"))
self.assertTrue(b.exists("baz"))
self.assertTrue(b.exists("qux"))

a.merge(b)

self.assertTrue(a.exists("foo"))
self.assertTrue(a.exists("bar"))
self.assertTrue(a.exists("baz"))
self.assertTrue(a.exists("qux"))

self.assertEqual(a.num_layers, 3)
self.assertLessEqual(a.false_positive_rate, 0.001)
self.assertLessEqual(a.utilization, 1.0)
self.assertGreater(a.capacity, 0.0)

0 comments on commit 3a6e837

Please sign in to comment.