Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions py2store/persisters/cassandra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
from abc import ABC
import uuid

from py2store.base import Persister
from cassandra.cluster import Cluster


KEYS_TABLE = 'storage_keys'
VALUES_TABLE = 'storage_values'
KEYS_TABLE_DFN = f'CREATE TABLE IF NOT EXISTS {KEYS_TABLE} (id uuid, OUTER_KEY TEXT primary key)'
VALUES_TABLE_DFN = f'CREATE TABLE IF NOT EXISTS {VALUES_TABLE} (key_id uuid primary key, VALUE TEXT)'
KEYSPACE_DFN = (
'CREATE KEYSPACE IF NOT EXISTS py2store WITH REPLICATION = {'
'\'class\': \'SimpleStrategy\','
'\'replication_factor\': \'1\''
'};'
)
USE_KEYSPACE = 'USE py2store'
SELECT_KEYS_QUERY = f'SELECT OUTER_KEY FROM {KEYS_TABLE}'
FIND_KEY_QUERY = f'SELECT id, OUTER_KEY from {KEYS_TABLE} WHERE OUTER_KEY = %s'
FIND_VALUE_QUERY = f'SELECT VALUE FROM {VALUES_TABLE} WHERE key_id = %s'
DEL_KEY_BY_ID_QUERY = f'DELETE FROM {KEYS_TABLE} WHERE OUTER_KEY = %s'
DEL_VAL_BY_KEY_ID_QUERY = f'DELETE FROM {VALUES_TABLE} WHERE key_id = %s'
INSERT_KEY_QUERY = f'INSERT INTO {KEYS_TABLE} (id, OUTER_KEY) VALUES (%s, %s)'
INSERT_VALUE_QUERY = f'INSERT INTO {VALUES_TABLE} (key_id, VALUE) VALUES (%s, %s)'
COUNT_QUERY = f'SELECT COUNT(*) FROM {KEYS_TABLE}'


class CassandraSessionManager(ABC):
def __init__(self, session=None):
self._session = session

# tables creation
commands = (KEYSPACE_DFN, USE_KEYSPACE, KEYS_TABLE_DFN, VALUES_TABLE_DFN)
for command in commands:
self._exec_command(command, tuple())

super(CassandraSessionManager, self).__init__()

def _exec_command(self, command, params):
self._session.execute(command, params)
return

def _query(self, query):
rows = self._session.execute(query)
for row in rows:
yield row

def _query_all(self, query, params):
return self._session.execute(query, params)

def iter_keys(self):
for row in self._query(SELECT_KEYS_QUERY):
yield row[0]

def rows_count(self):
count_row = self._query_all(COUNT_QUERY, tuple())
return count_row[0][0]

def get_item(self, k):
key_rows = self._query_all(FIND_KEY_QUERY, (k,))
if not key_rows:
raise KeyError(f"No document found for query: {k}")
key_id = key_rows[0][0]
rows = self._query_all(FIND_VALUE_QUERY, (key_id,))
if not rows:
raise KeyError(f"No document found for query: {k}")
return rows[0][0]

def del_item(self, k):
key_rows = self._query_all(FIND_KEY_QUERY, (k,))
if not key_rows:
raise KeyError(f"No document found for query: {k}")
key_id = key_rows[0][0]
self._exec_command(DEL_VAL_BY_KEY_ID_QUERY, (key_id,))
self._exec_command(DEL_KEY_BY_ID_QUERY, (k,))

def insert(self, k, v):
# del item if exists
try:
self.del_item(k)
except KeyError:
pass
key_id = uuid.uuid4()
self._exec_command(INSERT_KEY_QUERY, (key_id, k))
self._exec_command(INSERT_VALUE_QUERY, (key_id, v))


class CassandraPersister(Persister):
"""
A basic Cassandra persister.

>>> s = CassandraPersister()
>>> for _id in s: # deleting all docs in tmp
... del s[_id]
>>> k = {'_id': 'foo'}
>>> v = {'val': 'bar'}
>>> k in s # see that key is not in store (and testing __contains__)
False
>>> len(s)
0
>>> s[k] = v
>>> len(s)
1
>>> list(s)
[{'_id': 'foo'}]
>>> s[k]
{'val': 'bar'}
>>> s.get(k)
{'val': 'bar'}
>>> s.get({'not': 'a key'}, {'default': 'val'}) # testing s.get with default
{'default': 'val'}
>>> list(s.values())
[{'val': 'bar'}]
>>> k in s # testing __contains__ again
True
>>> del s[k]
>>> len(s)
0
>>>
>>> s = CassandraPersister()
>>> for _id in s: # deleting all docs in tmp
... del s[_id]
>>> s[{'name': 'guido'}] = {'yob': 1956, 'proj': 'python', 'bdfl': False}
>>> s[{'name': 'vitalik'}] = {'yob': 1994, 'proj': 'ethereum', 'bdfl': True}
>>> for key, val in s.items():
... print(f"{key}: {val}")
{'name': 'guido'}: {'yob': 1956, 'proj': 'python', 'bdfl': False}
{'name': 'vitalik'}: {'yob': 1994, 'proj': 'ethereum', 'bdfl': True}
"""

def clear(self):
raise NotImplementedError(
"clear is disabled by default, for your own protection! "
"Loop and delete if you really want to."
)

def __init__(
self,
url='localhost',
cassandra_kwargs=None
):
if cassandra_kwargs is None:
cassandra_kwargs = {'port': 9042}
if isinstance(url, str):
url = [url, ]

self._cluster = Cluster(url, **cassandra_kwargs)
self._session = self._cluster.connect()
self._session_manager = CassandraSessionManager(self._session)

def __getitem__(self, k):
return self._session_manager.get_item(k)

def __setitem__(self, k, v):
self._session_manager.insert(k, v)

def __delitem__(self, k):
self._session_manager.del_item(k)

def __iter__(self):
return self._session_manager.iter_keys()

def __len__(self):
return self._session_manager.rows_count()
79 changes: 79 additions & 0 deletions py2store/stores/cassandra_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import codecs
import pickle

from functools import wraps
from py2store.base import Store
from py2store.persisters.cassandra import CassandraPersister


class PickleSerializer:
@staticmethod
def loads(pickled):
return pickle.loads(codecs.decode(pickled.encode(), "base64"))

@staticmethod
def dumps(obj):
return codecs.encode(pickle.dumps(obj), "base64").decode()


class CassandraStore(Store):
def clear(self):
super(CassandraStore, self).clear()

@wraps(CassandraPersister.__init__)
def __init__(self, *args, **kwargs):
persister = CassandraPersister(*args, **kwargs)
self.serializer = PickleSerializer()
super().__init__(persister)

def _data_of_obj(self, v):
return self.serializer.dumps(v)

def _obj_of_data(self, data):
return self.serializer.loads(data)

def _id_of_key(self, k):
return self.serializer.dumps(k)

def _key_of_id(self, _id):
return self.serializer.loads(_id)


def test_cassandra_store(s=CassandraStore(), k=None, v=None):
if k is None:
k = {'_id': 'foo'}
if v is None:
v = {'val': 'bar'}
if k in s: # deleting all docs in tmp
del s[k]
assert (k in s) is False # see that key is not in store (and testing __contains__)
orig_length = len(s)
s[k] = v
assert len(s) == orig_length + 1
assert k in list(s)
assert s[k] == v
assert s.get(k) == v
assert v in list(s.values())
assert (k in s) is True # testing __contains__ again
del s[k]
assert len(s) == 0

k = (1234, 'user')
v = {'name': 'bob', 'age': 42}
if k in s: # deleting all docs in tmp
del s[k]
assert (k in s) is False # see that key is not in store (and testing __contains__)
orig_length = len(s)
s[k] = v
assert len(s) == orig_length + 1
assert k in list(s)
assert s[k] == v
assert s.get(k) == v
assert v in list(s.values())
assert (k in s) is True # testing __contains__ again
del s[k]
assert len(s) == orig_length


if __name__ == '__main__':
test_cassandra_store()