Skip to content

Commit

Permalink
Merge pull request #20 from sergray/master
Browse files Browse the repository at this point in the history
Threaded backend for elasticsearch
  • Loading branch information
sergray authored Sep 7, 2016
2 parents 0dd2a76 + a87f37a commit 9778f39
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 17 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ clean: pyclean docsclean
venv:
@virtualenv -p python2.7 venv
@$(PIP) install -U "pip>=7.0" -q
@$(PIP) install -U "pip>=7.0" -q
@$(PIP) install -r $(DEPS)

test: venv pyclean
Expand Down
80 changes: 65 additions & 15 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime

import mock
from elasticsearch.exceptions import TransportError
from fqn_decorators import get_fqn
from freezegun import freeze_time
from tests.conftest import Dummy, go
Expand All @@ -13,14 +14,35 @@ class TestConnectionErrors(TestBaseBackend):

@mock.patch('time_execution.backends.elasticsearch.logger')
def test_error_resilience(self, mocked_logger):
backend = ElasticsearchBackend() # defaults to localhost, which is unavailable
backend = ElasticsearchBackend(hosts=['non-existant-domain'])
# ensure index and mapping setup failures are caught and logged
self.assertEqual(2, len(mocked_logger.error.call_args_list))
# ensure write failure is caught and logged
backend.write(name='test_error_resilience')
mocked_logger.warning.assert_called_once()


class ElasticTestMixin(object):

@staticmethod
def _clear(backend):
backend.client.indices.delete(backend.index, ignore=404)
backend.client.indices.delete("{}*".format(backend.index), ignore=404)

@staticmethod
def _query_backend(backend, name):
backend.client.indices.refresh(backend.get_index())
metrics = backend.client.search(
index=backend.get_index(),
body={
"query": {
"term": {"name": name}
},
}
)
return metrics


class TestTimeExecution(TestBaseBackend):
def setUp(self):
super(TestTimeExecution, self).setUp()
Expand All @@ -36,21 +58,10 @@ def tearDown(self):
self._clear()

def _clear(self):
self.backend.client.indices.delete(self.backend.index, ignore=404)
self.backend.client.indices.delete("{}*".format(self.backend.index), ignore=404)
ElasticTestMixin._clear(self.backend)

def _query_backend(self, name):

self.backend.client.indices.refresh(self.backend.get_index())
metrics = self.backend.client.search(
index=self.backend.get_index(),
body={
"query": {
"term": {"name": name}
},
}
)
return metrics
return ElasticTestMixin._query_backend(self.backend, name)

def test_time_execution(self):

Expand Down Expand Up @@ -100,7 +111,6 @@ def test_metadata(*args, **kwargs):

@mock.patch('time_execution.backends.elasticsearch.logger')
def test_error_warning(self, mocked_logger):
from elasticsearch.exceptions import TransportError

transport_error = TransportError('mocked error')
es_index_error_ctx = mock.patch(
Expand Down Expand Up @@ -128,3 +138,43 @@ def test_with_origin(self):

for metric in self._query_backend(go.fqn)['hits']['hits']:
self.assertEqual(metric['_source']['origin'], 'unit_test')

def test_bulk_write(self):
metrics = [
{
'name': 'metric.name',
'value': 1,
'timestamp': 1,
},
{
'name': 'metric.name',
'value': 2,
'timestamp': 2,
},
{
'name': 'metric.name',
'value': 3,
'timestamp': 3,
}
]
self.backend.bulk_write(metrics)
query_result = self._query_backend('metric.name')
self.assertEqual(
len(metrics),
query_result['hits']['total']
)

@mock.patch('time_execution.backends.elasticsearch.logger')
def test_bulk_write_error(self, mocked_logger):
transport_error = TransportError('mocked error')
es_index_error_ctx = mock.patch(
'time_execution.backends.elasticsearch.Elasticsearch.bulk',
side_effect=transport_error
)
metrics = [1, 2, 3]
with es_index_error_ctx:
self.backend.bulk_write(metrics)
mocked_logger.warning.assert_called_once_with(
'bulk_write metrics %r failure %r',
metrics,
transport_error)
135 changes: 135 additions & 0 deletions tests/test_threaded_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import time
from datetime import datetime

import mock
from freezegun import freeze_time
from tests.conftest import go
from tests.test_base_backend import TestBaseBackend
from time_execution import settings
from time_execution.backends import elasticsearch
from time_execution.backends.threaded import ThreadedBackend
from time_execution.decorator import SHORT_HOSTNAME

from .test_elasticsearch import ElasticTestMixin


class TestTimeExecution(TestBaseBackend):

def setUp(self):
self.qsize = 10
self.qtimeout = 0.1

self.mocked_backend = mock.Mock(spec=elasticsearch.ElasticsearchBackend)
self.MockedBackendClass = mock.Mock(return_value=self.mocked_backend)

self.backend = ThreadedBackend(
self.MockedBackendClass,
backend_args=('arg1', 'arg2'),
backend_kwargs=dict(key1='kwarg1', key2='kwarg2'),
queue_maxsize=self.qsize,
queue_timeout=self.qtimeout,
)
self.backend.bulk_size = self.qsize / 2
self.backend.bulk_timeout = self.qtimeout * 2
settings.configure(backends=[self.backend])

def stop_worker(self):
self.backend.worker_limit = 0
time.sleep(self.qtimeout)
self.assertEqual(self.backend.thread, None)

def test_backend_args(self):
self.MockedBackendClass.assert_called_with('arg1', 'arg2', key1='kwarg1', key2='kwarg2')
ThreadedBackend(self.MockedBackendClass)
self.MockedBackendClass.assert_called_with()

def test_empty_queue(self):
time.sleep(2 * self.qtimeout) # ensures queue.get times out
self.assertEqual(0, self.backend.fetched_items)

def test_decorator(self):
with freeze_time('2016-08-01 00:00:00'):
go()
# ensure worker thread catches up
time.sleep(2 * self.backend.bulk_timeout)
mocked_write = self.mocked_backend.bulk_write
self.assertEqual(1, self.backend.fetched_items)
mocked_write.assert_called_with([{
'timestamp': datetime(2016, 8, 1, 0, 0),
'hostname': SHORT_HOSTNAME,
'name': 'tests.conftest.go',
'value': 0.0,
}])

def test_double_start(self):
self.assertEqual(0, self.backend.fetched_items)
go()
time.sleep(2 * self.qtimeout)
self.assertEqual(1, self.backend.fetched_items)
# try to double start
self.backend.start_worker()
self.assertEqual(1, self.backend.fetched_items)

def test_write_error(self):
self.mocked_backend.write.side_effect = RuntimeError('mocked')
go()
time.sleep(2 * self.qtimeout)

def test_queue_congestion(self):
# assure worker is stopped
self.stop_worker()

# fill in the queue
for _ in range(self.qsize * 2):
go()
self.assertTrue(self.backend._queue.full())

# resume the worker
self.backend.worker_limit = None
self.backend.bulk_timeout = self.qtimeout
self.backend.start_worker()
# wait until all metrics are picked up
time.sleep(self.qsize * self.qtimeout)
# check that metrics in the queue were sent with bulk_write calls
call_args_list = self.mocked_backend.bulk_write.call_args_list
self.assertEqual(
self.qsize,
sum(len(args[0]) for args, _ in call_args_list)
)

def test_worker_sends_remainder(self):
self.stop_worker()
self.mocked_backend.bulk_write.side_effect = RuntimeError('mock')
loops_count = 3
self.assertTrue(loops_count < self.backend.bulk_size)
for _ in range(loops_count):
go()
self.backend.worker_limit = loops_count
self.backend.worker()
self.assertEqual(loops_count, self.backend.fetched_items)
mocked_bulk_write = self.mocked_backend.bulk_write
mocked_bulk_write.assert_called_once()
self.assertEqual(
loops_count,
len(mocked_bulk_write.call_args[0][0])
)


class TestElastic(TestBaseBackend, ElasticTestMixin):

def setUp(self):
self.qtime = 0.1
self.backend = ThreadedBackend(
elasticsearch.ElasticsearchBackend,
backend_args=('elasticsearch', ),
backend_kwargs=dict(index='threaded-metrics'),
queue_timeout=self.qtime,
)
settings.configure(backends=[self.backend])
self._clear(self.backend.backend)

def test_write_method(self):
go()
time.sleep(2 * self.backend.bulk_timeout)
metrics = self._query_backend(self.backend.backend, go.fqn)
self.assertEqual(metrics['hits']['total'], 1)
20 changes: 19 additions & 1 deletion time_execution/backends/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def write(self, name, **data):
"""

data["name"] = name
data["timestamp"] = datetime.utcnow()
if not ("timestamp" in data):
data["timestamp"] = datetime.utcnow()

try:
self.client.index(
Expand All @@ -108,3 +109,20 @@ def write(self, name, **data):
)
except TransportError as exc:
logger.warning('writing metric %r failure %r', data, exc)

def bulk_write(self, metrics):
"""
Write multiple metrics to elasticsearch in one request
Args:
metrics (list): data with mappings to send to elasticsearch
"""
actions = []
index = self.get_index()
for metric in metrics:
actions.append({'index': {'_index': index, '_type': self.doc_type}})
actions.append(metric)
try:
self.client.bulk(actions)
except TransportError as exc:
logger.warning('bulk_write metrics %r failure %r', metrics, exc)
87 changes: 87 additions & 0 deletions time_execution/backends/threaded.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import absolute_import

import datetime
import logging
import threading
import time

from time_execution.backends.base import BaseMetricsBackend

try:
from Queue import Queue, Empty, Full
except ImportError:
from queue import Queue, Empty, Full


logger = logging.getLogger(__file__)


class ThreadedBackend(BaseMetricsBackend):

def __init__(self, backend, backend_args=None, backend_kwargs=None,
queue_maxsize=1000, queue_timeout=0.5, worker_limit=None):
if backend_args is None:
backend_args = tuple()
if backend_kwargs is None:
backend_kwargs = dict()
self.queue_timeout = queue_timeout
self.worker_limit = worker_limit
self.thread = None
self.fetched_items = 0
self.bulk_size = 50
self.bulk_timeout = 1 # second
self.backend = backend(*backend_args, **backend_kwargs)
self._queue = Queue(maxsize=queue_maxsize)
self.start_worker()

def write(self, name, **data):
data["timestamp"] = datetime.datetime.utcnow()
try:
self._queue.put_nowait((name, data))
except Full:
logger.warning("Discard metric %s", name)

def start_worker(self):
if self.thread:
return
self.fetched_items = 0
self.thread = threading.Thread(target=self.worker)
self.thread.daemon = True
self.thread.start()

def batch_ready(self, batch):
return self.bulk_size < len(batch)

def batch_time(self, last_write):
return (time.time() - last_write) >= self.bulk_timeout

def has_work(self):
if self.worker_limit is None:
return True
return self.fetched_items < self.worker_limit

def worker(self):
metrics = []
last_write = time.time()

def send_metrics():
try:
self.backend.bulk_write(metrics)
except Exception as exc:
logger.warning('%r write failure %r', self.backend, exc)

while self.has_work():
if self.batch_ready(metrics) or (self.batch_time(last_write) and metrics):
send_metrics()
last_write = time.time()
metrics = []
try: # blocking get
name, data = self._queue.get(True, self.queue_timeout)
except Empty:
continue
self.fetched_items += 1
data['name'] = name
metrics.append(data)
if metrics:
send_metrics()
self.thread = None

0 comments on commit 9778f39

Please sign in to comment.