From dcfca3cc03c371f8cdd00539e3158849fc86e868 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 1 Apr 2018 11:48:04 +0800 Subject: [PATCH 01/19] support async bulk api --- aioelasticsearch/helpers.py | 123 +++++++++++++++++++++++++++++++++++- 1 file changed, 122 insertions(+), 1 deletion(-) diff --git a/aioelasticsearch/helpers.py b/aioelasticsearch/helpers.py index 484f035d..0a3441cd 100644 --- a/aioelasticsearch/helpers.py +++ b/aioelasticsearch/helpers.py @@ -1,9 +1,12 @@ import asyncio import logging +from operator import methodcaller +from typing import List from aioelasticsearch import NotFoundError -from elasticsearch.helpers import ScanError +from elasticsearch.helpers import ScanError, _chunk_actions, expand_action +from elasticsearch.exceptions import TransportError from .compat import PY_352 @@ -147,3 +150,121 @@ def _update_state(self, resp): self._successful_shards = resp['_shards']['successful'] self._total_shards = resp['_shards']['total'] self._done = not self._hits or self._scroll_id is None + + +async def worker_bulk(client, datas: List[dict], actions: List[str], **kwargs): + try: + resp = await client.bulk("\n".join(actions) + '\n', **kwargs) + except TransportError as e: + return e, datas + fail_actions = [] + finish_count = 0 + for data, (op_type, item) in zip(datas, map(methodcaller('popitem'), + resp['items'])): + ok = 200 <= item.get('status', 500) < 300 + if not ok: + fail_actions.append(data) + else: + finish_count += 1 + return finish_count, fail_actions + + +def _get_fail_data(results, serializer): + finish_count = 0 + bulk_action = [] + bulk_data = [] + lazy_exception = None + for result in results: + if isinstance(result[0], int): + finish_count += result[0] + else: + if lazy_exception is None: + lazy_exception = result[0] + + for fail_data in result[1]: + for _ in fail_data: + bulk_data.append(_) + if result[1]: + bulk_action.extend(map(serializer.dumps,result[1])) + return finish_count, bulk_data, bulk_action, lazy_exception + + +async def _retry_handler(client, futures, max_retries, initial_backoff, + max_backoff, **kwargs): + finish = 0 + for attempt in range(max_retries + 1): + if attempt: + sleep = min(max_backoff, initial_backoff * 2 ** (attempt - 1)) + await asyncio.sleep(sleep) + + results = await asyncio.gather(*futures, + return_exceptions=True) + futures = [] + + count, fail_data, fail_action, lazy_exception = \ + _get_fail_data(results, client.transport.serializer) + + finish += count + + if not fail_action or attempt == max_retries: + break + + coroutine = worker_bulk(client, fail_data, fail_action, **kwargs) + futures.append(asyncio.ensure_future(coroutine)) + + if lazy_exception: + raise lazy_exception + + return finish, fail_data + + +async def bulk(client, actions, concurrency_limit=2, chunk_size=500, + max_chunk_bytes=100 * 1024 * 1024, + expand_action_callback=expand_action, max_retries=0, + initial_backoff=2, max_backoff=600, stats_only=False, **kwargs): + + async def concurrency_wrapper(chunk_iter): + + partial_count = 0 + if stats_only: + partial_fail = 0 + else: + partial_fail = [] + for bulk_data, bulk_action in chunk_iter: + futures = [worker_bulk(client, bulk_data, bulk_action, **kwargs)] + count, fails = await _retry_handler(client, + futures, + max_retries, + initial_backoff, + max_backoff, **kwargs) + partial_count += count + if stats_only: + partial_fail += len(fails) + else: + partial_fail.extend(fails) + return partial_count, partial_fail + + actions = map(expand_action_callback, actions) + finish_count = 0 + if stats_only: + fail_datas = 0 + else: + fail_datas = [] + + chunk_action_iter = _chunk_actions(actions, chunk_size, max_chunk_bytes, + client.transport.serializer) + + tasks = [] + concurrency_limit = concurrency_limit if concurrency_limit > 0 else 2 + for i in range(concurrency_limit): + tasks.append(concurrency_wrapper(chunk_action_iter)) + + results = await asyncio.gather(*tasks) + for p_count, p_fails in results: + finish_count += p_count + if stats_only: + fail_datas += p_fails + else: + fail_datas.extend(p_fails) + + return finish_count, fail_datas From ee8551eb052519b306ca6a4c45f9033a67a9844a Mon Sep 17 00:00:00 2001 From: rainjay Date: Mon, 2 Apr 2018 00:32:16 +0800 Subject: [PATCH 02/19] init test bulk --- tests/test_bulk.py | 53 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 tests/test_bulk.py diff --git a/tests/test_bulk.py b/tests/test_bulk.py new file mode 100644 index 00000000..ccb255b0 --- /dev/null +++ b/tests/test_bulk.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +import logging + +from unittest import mock + +import pytest + +from aioelasticsearch import NotFoundError +from aioelasticsearch.helpers import bulk + + +logger = logging.getLogger('elasticsearch') + +def gen_data1(): + for i in range(10): + yield {"_index": "test_aioes", + "_type": "type_3", + "_id": str(i), + "foo": "1"} + +def gen_data2(): + for i in range(10,20): + yield {"_index": "test_aioes", + "_type": "type_3", + "_id": str(i), + "_source": {"foo": "1"} + } + + +@pytest.mark.run_loop +async def test_bulk_simple(es): + success, fails = await bulk(es, gen_data1(), + concurrency_limit=2, + stats_only=True) + await success == 10 + assert fails == 0 + + + success, fails = await bulk(es, gen_data2(), + concurrency_limit=2, + stats_only=True) + await success == 10 + assert fails == 0 + +@pytest.mark.run_loop +async def test_bulk_fails(es): + datas = [{'op_type': 'delete', + '_index': 'test_aioes', + '_type': 'type_3', '_id': "999"} + ] + success, fails = await bulk(es,datas,stats_only=True) + await success == 0 + await success == 1 \ No newline at end of file From a7374072be983268b99568433c0a8cb975a187b5 Mon Sep 17 00:00:00 2001 From: RainJay Date: Mon, 2 Apr 2018 09:21:42 +0800 Subject: [PATCH 03/19] fix assert error --- tests/test_bulk.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index ccb255b0..32cda87a 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -1,16 +1,13 @@ # -*- coding: utf-8 -*- import logging -from unittest import mock - import pytest -from aioelasticsearch import NotFoundError from aioelasticsearch.helpers import bulk - logger = logging.getLogger('elasticsearch') + def gen_data1(): for i in range(10): yield {"_index": "test_aioes", @@ -18,8 +15,9 @@ def gen_data1(): "_id": str(i), "foo": "1"} + def gen_data2(): - for i in range(10,20): + for i in range(10, 20): yield {"_index": "test_aioes", "_type": "type_3", "_id": str(i), @@ -32,22 +30,22 @@ async def test_bulk_simple(es): success, fails = await bulk(es, gen_data1(), concurrency_limit=2, stats_only=True) - await success == 10 + assert success == 10 assert fails == 0 - success, fails = await bulk(es, gen_data2(), concurrency_limit=2, stats_only=True) - await success == 10 + assert success == 10 assert fails == 0 + @pytest.mark.run_loop async def test_bulk_fails(es): datas = [{'op_type': 'delete', '_index': 'test_aioes', '_type': 'type_3', '_id': "999"} ] - success, fails = await bulk(es,datas,stats_only=True) - await success == 0 - await success == 1 \ No newline at end of file + success, fails = await bulk(es, datas, stats_only=True) + assert success == 0 + assert success == 1 From f284685cbf25d8cc3a87ee812e174abf884d8291 Mon Sep 17 00:00:00 2001 From: RainJay Date: Mon, 2 Apr 2018 21:02:38 +0800 Subject: [PATCH 04/19] remove typing --- aioelasticsearch/helpers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aioelasticsearch/helpers.py b/aioelasticsearch/helpers.py index 0a3441cd..8356dea1 100644 --- a/aioelasticsearch/helpers.py +++ b/aioelasticsearch/helpers.py @@ -2,7 +2,6 @@ import logging from operator import methodcaller -from typing import List from aioelasticsearch import NotFoundError from elasticsearch.helpers import ScanError, _chunk_actions, expand_action @@ -152,7 +151,7 @@ def _update_state(self, resp): self._done = not self._hits or self._scroll_id is None -async def worker_bulk(client, datas: List[dict], actions: List[str], **kwargs): +async def worker_bulk(client, datas , actions, **kwargs): try: resp = await client.bulk("\n".join(actions) + '\n', **kwargs) except TransportError as e: From de038706af882ed926648447b661a32af8ead0b5 Mon Sep 17 00:00:00 2001 From: rainjay Date: Mon, 2 Apr 2018 22:07:55 +0800 Subject: [PATCH 05/19] update bulk feature info --- README.rst | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/README.rst b/README.rst index c1564528..13aee3b1 100644 --- a/README.rst +++ b/README.rst @@ -68,6 +68,46 @@ Asynchronous `scroll `_ + +.. code-block:: python + + import asyncio + + from aioelasticsearch import Elasticsearch + from aioelasticsearch.helpers import bulk + + def gen_data(): + for i in range(10): + yield { "_index" : "test", + "_type" : "_doc", + "_id" : str(i), + "FIELD1": "TEXT", + } + def gen_data2(): + for i in range(10): + yield { "_index" : "test", + "_type" : "_doc", + "_id" : str(i), + "_source":{ + "FIELD1": "TEXT", + } + } + + + async def go(): + async with Elasticsearch() as es: + success, fails = \ + await bulk(es, gen_data(), + concurrency_limit=10) + + + loop = asyncio.get_event_loop() + loop.run_until_complete(go()) + loop.close() + + + Thanks ------ From ce9bd9b74ba3901f1f4324a362adf2dcfb39d795 Mon Sep 17 00:00:00 2001 From: RainJay Date: Mon, 2 Apr 2018 09:21:42 +0800 Subject: [PATCH 06/19] fix assert error --- tests/test_bulk.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index 32cda87a..c6215a80 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -5,6 +5,7 @@ from aioelasticsearch.helpers import bulk + logger = logging.getLogger('elasticsearch') From d3b7091b6bb3ea899ed9d335d688913beb0aa254 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sat, 7 Apr 2018 23:54:22 +0800 Subject: [PATCH 07/19] fix retry bug --- aioelasticsearch/helpers.py | 53 ++++++++++++++----------------------- 1 file changed, 20 insertions(+), 33 deletions(-) diff --git a/aioelasticsearch/helpers.py b/aioelasticsearch/helpers.py index 8356dea1..2384cab2 100644 --- a/aioelasticsearch/helpers.py +++ b/aioelasticsearch/helpers.py @@ -151,7 +151,7 @@ def _update_state(self, resp): self._done = not self._hits or self._scroll_id is None -async def worker_bulk(client, datas , actions, **kwargs): +async def _process_bulk(client, datas, actions, **kwargs): try: resp = await client.bulk("\n".join(actions) + '\n', **kwargs) except TransportError as e: @@ -168,53 +168,40 @@ async def worker_bulk(client, datas , actions, **kwargs): return finish_count, fail_actions -def _get_fail_data(results, serializer): - finish_count = 0 - bulk_action = [] +async def _retry_handler(client, coroutine, max_retries, initial_backoff, + max_backoff, **kwargs): + finish = 0 bulk_data = [] - lazy_exception = None - for result in results: + for attempt in range(max_retries + 1): + bulk_data = [] + bulk_action = [] + lazy_exception = None + + if attempt: + sleep = min(max_backoff, initial_backoff * 2 ** (attempt - 1)) + await asyncio.sleep(sleep, loop=client.loop) + + result = await coroutine if isinstance(result[0], int): - finish_count += result[0] + finish += result[0] else: - if lazy_exception is None: - lazy_exception = result[0] + lazy_exception = result[0] for fail_data in result[1]: for _ in fail_data: bulk_data.append(_) if result[1]: - bulk_action.extend(map(serializer.dumps,result[1])) - return finish_count, bulk_data, bulk_action, lazy_exception - - -async def _retry_handler(client, futures, max_retries, initial_backoff, - max_backoff, **kwargs): - finish = 0 - for attempt in range(max_retries + 1): - if attempt: - sleep = min(max_backoff, initial_backoff * 2 ** (attempt - 1)) - await asyncio.sleep(sleep) - - results = await asyncio.gather(*futures, - return_exceptions=True) - futures = [] - - count, fail_data, fail_action, lazy_exception = \ - _get_fail_data(results, client.transport.serializer) - - finish += count + bulk_action.extend(map(client.transport.serializer.dumps, result[1])) - if not fail_action or attempt == max_retries: + if not bulk_action or attempt == max_retries: break - coroutine = worker_bulk(client, fail_data, fail_action, **kwargs) - futures.append(asyncio.ensure_future(coroutine)) + coroutine = _process_bulk(client, bulk_data, bulk_action, **kwargs) if lazy_exception: raise lazy_exception - return finish, fail_data + return finish, bulk_data async def bulk(client, actions, concurrency_limit=2, chunk_size=500, From 4ecef47ccb2ce4d02108fea3b17c0d34266196a9 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 00:04:51 +0800 Subject: [PATCH 08/19] update test --- tests/test_bulk.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index c6215a80..a06ccc80 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -29,13 +29,11 @@ def gen_data2(): @pytest.mark.run_loop async def test_bulk_simple(es): success, fails = await bulk(es, gen_data1(), - concurrency_limit=2, stats_only=True) assert success == 10 assert fails == 0 success, fails = await bulk(es, gen_data2(), - concurrency_limit=2, stats_only=True) assert success == 10 assert fails == 0 @@ -43,10 +41,11 @@ async def test_bulk_simple(es): @pytest.mark.run_loop async def test_bulk_fails(es): - datas = [{'op_type': 'delete', + datas = [{'_op_type': 'delete', '_index': 'test_aioes', '_type': 'type_3', '_id': "999"} ] + success, fails = await bulk(es, datas, stats_only=True) assert success == 0 - assert success == 1 + assert fails == 1 From 6a8cda212cdae9c47e65d65abcd9ae58e0659dea Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 00:20:13 +0800 Subject: [PATCH 09/19] fix concurrency bulk bug and add testing --- aioelasticsearch/helpers.py | 49 +++++++++++++++++++++++++++---------- tests/test_bulk.py | 18 +++++++++++++- 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/aioelasticsearch/helpers.py b/aioelasticsearch/helpers.py index 2384cab2..4ec67070 100644 --- a/aioelasticsearch/helpers.py +++ b/aioelasticsearch/helpers.py @@ -209,26 +209,49 @@ async def bulk(client, actions, concurrency_limit=2, chunk_size=500, expand_action_callback=expand_action, max_retries=0, initial_backoff=2, max_backoff=600, stats_only=False, **kwargs): - async def concurrency_wrapper(chunk_iter): + finish_count = 0 + if stats_only: + fail_datas = 0 + else: + fail_datas = [] + + chunk_action_iter = _chunk_actions(actions, chunk_size, max_chunk_bytes, + client.transport.serializer) - partial_count = 0 + for bulk_data, bulk_action in chunk_action_iter: + coroutine = _process_bulk(client, bulk_data, bulk_action, **kwargs) + count, fails = await _retry_handler(client, + coroutine, + max_retries, + initial_backoff, + max_backoff, **kwargs) + finish_count += count if stats_only: - partial_fail = 0 + fail_datas += len(fails) else: - partial_fail = [] - for bulk_data, bulk_action in chunk_iter: - futures = [worker_bulk(client, bulk_data, bulk_action, **kwargs)] + fail_datas.extend(fails) + + return finish_count, fail_datas + + +async def concurrency_bulk(client, actions, concurrency_count=4, + chunk_size=500, max_retries=0, + max_chunk_bytes=100 * 1024 * 1024, + expand_action_callback=expand_action, + initial_backoff=2, max_backoff=600, **kwargs): + + async def concurrency_wrapper(action_iter): + p_count = p_fails = 0 + for bulk_data, bulk_action in action_iter: + coroutine = _process_bulk(client, bulk_data, bulk_action, **kwargs) count, fails = await _retry_handler(client, - futures, + coroutine, max_retries, initial_backoff, max_backoff, **kwargs) - partial_count += count - if stats_only: - partial_fail += len(fails) - else: - partial_fail.extend(fails) - return partial_count, partial_fail + p_count += count + p_fails += len(fails) + return p_count, p_fails actions = map(expand_action_callback, actions) finish_count = 0 diff --git a/tests/test_bulk.py b/tests/test_bulk.py index a06ccc80..64c110d4 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -3,7 +3,7 @@ import pytest -from aioelasticsearch.helpers import bulk +from aioelasticsearch.helpers import bulk, concurrency_bulk logger = logging.getLogger('elasticsearch') @@ -38,6 +38,11 @@ async def test_bulk_simple(es): assert success == 10 assert fails == 0 + success, fails = await bulk(es, gen_data1(), + stats_only=False) + assert success == 10 + assert fails is [] + @pytest.mark.run_loop async def test_bulk_fails(es): @@ -49,3 +54,14 @@ async def test_bulk_fails(es): success, fails = await bulk(es, datas, stats_only=True) assert success == 0 assert fails == 1 + + +@pytest.mark.run_loop +async def test_concurrency_bulk(es): + success, fails = await concurrency_bulk(es, gen_data1()) + assert success == 10 + assert fails == 0 + + success, fails = await concurrency_bulk(es, gen_data2()) + assert success == 10 + assert fails == 0 \ No newline at end of file From fc282e3b1276f89eb8a9f141158d27d1399745ce Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 00:24:14 +0800 Subject: [PATCH 10/19] fix bug --- tests/test_bulk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index 64c110d4..6c14a154 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -41,7 +41,7 @@ async def test_bulk_simple(es): success, fails = await bulk(es, gen_data1(), stats_only=False) assert success == 10 - assert fails is [] + assert fails == [] @pytest.mark.run_loop From 51c6c5595c897b928cdfec03804972a172edf955 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 01:12:38 +0800 Subject: [PATCH 11/19] manual rebase --- aioelasticsearch/helpers.py | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/aioelasticsearch/helpers.py b/aioelasticsearch/helpers.py index 4ec67070..cc7b4ce4 100644 --- a/aioelasticsearch/helpers.py +++ b/aioelasticsearch/helpers.py @@ -204,10 +204,11 @@ async def _retry_handler(client, coroutine, max_retries, initial_backoff, return finish, bulk_data -async def bulk(client, actions, concurrency_limit=2, chunk_size=500, +async def bulk(client, actions, chunk_size=500, max_retries=0, max_chunk_bytes=100 * 1024 * 1024, - expand_action_callback=expand_action, max_retries=0, - initial_backoff=2, max_backoff=600, stats_only=False, **kwargs): + expand_action_callback=expand_action, initial_backoff=2, + max_backoff=600, stats_only=False, **kwargs): + actions = map(expand_action_callback, actions) finish_count = 0 if stats_only: @@ -254,26 +255,19 @@ async def concurrency_wrapper(action_iter): return p_count, p_fails actions = map(expand_action_callback, actions) - finish_count = 0 - if stats_only: - fail_datas = 0 - else: - fail_datas = [] - chunk_action_iter = _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer) tasks = [] - concurrency_limit = concurrency_limit if concurrency_limit > 0 else 2 - for i in range(concurrency_limit): + for i in range(concurrency_count): tasks.append(concurrency_wrapper(chunk_action_iter)) - results = await asyncio.gather(*tasks) - for p_count, p_fails in results: - finish_count += p_count - if stats_only: - fail_datas += p_fails - else: - fail_datas.extend(p_fails) + results = await asyncio.gather(*tasks, loop=client.loop) - return finish_count, fail_datas + finish_count = 0 + fail_count = 0 + for p_finish, p_fail in results: + finish_count += p_finish + fail_count += p_fail + + return finish_count, fail_count From 41019c4f169d1eb443f5aa4ceb7ebf385fec8f0e Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 01:27:54 +0800 Subject: [PATCH 12/19] add testing --- tests/test_bulk.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index 6c14a154..462fe243 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -4,7 +4,7 @@ import pytest from aioelasticsearch.helpers import bulk, concurrency_bulk - +from aioelasticsearch import Elasticsearch, TransportError logger = logging.getLogger('elasticsearch') @@ -64,4 +64,15 @@ async def test_concurrency_bulk(es): success, fails = await concurrency_bulk(es, gen_data2()) assert success == 10 - assert fails == 0 \ No newline at end of file + assert fails == 0 + + +@pytest.mark.run_loop +async def test_bulk_raise_exception(): + es = Elasticsearch() + datas = [{'_op_type': 'delete', + '_index': 'test_aioes', + '_type': 'type_3', '_id': "999"} + ] + with pytest.raises(TransportError): + success, fails = await bulk(es, datas, stats_only=True) From c62496016f06c6ad2b6b2d015f790be32f335216 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 01:33:46 +0800 Subject: [PATCH 13/19] fix bug --- tests/test_bulk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index 462fe243..7906ae44 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -68,7 +68,7 @@ async def test_concurrency_bulk(es): @pytest.mark.run_loop -async def test_bulk_raise_exception(): +async def test_bulk_raise_exception(loop): es = Elasticsearch() datas = [{'_op_type': 'delete', '_index': 'test_aioes', From 4c0d3bb7708d29140709ac818e1d7402a0627ad5 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 01:41:24 +0800 Subject: [PATCH 14/19] support py35 --- tests/test_bulk.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index 7906ae44..8ae9115d 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import asyncio import logging import pytest @@ -69,6 +70,8 @@ async def test_concurrency_bulk(es): @pytest.mark.run_loop async def test_bulk_raise_exception(loop): + + asyncio.set_event_loop(loop) es = Elasticsearch() datas = [{'_op_type': 'delete', '_index': 'test_aioes', From 54479b21acaef0aff2e4f7c84f80e8810ae109b5 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 01:47:30 +0800 Subject: [PATCH 15/19] update testing --- tests/test_bulk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index 8ae9115d..36b57392 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -52,7 +52,7 @@ async def test_bulk_fails(es): '_type': 'type_3', '_id': "999"} ] - success, fails = await bulk(es, datas, stats_only=True) + success, fails = await bulk(es, datas, stats_only=True,max_retries=5) assert success == 0 assert fails == 1 From 1dcddcd0ac6ed6bb1f008f06dcf6f4c301148355 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 02:48:31 +0800 Subject: [PATCH 16/19] fix retry bug --- aioelasticsearch/helpers.py | 26 +++++++++++++++++++------- tests/test_bulk.py | 2 +- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/aioelasticsearch/helpers.py b/aioelasticsearch/helpers.py index cc7b4ce4..83cd981f 100644 --- a/aioelasticsearch/helpers.py +++ b/aioelasticsearch/helpers.py @@ -170,15 +170,17 @@ async def _process_bulk(client, datas, actions, **kwargs): async def _retry_handler(client, coroutine, max_retries, initial_backoff, max_backoff, **kwargs): + finish = 0 bulk_data = [] for attempt in range(max_retries + 1): - bulk_data = [] bulk_action = [] lazy_exception = None if attempt: sleep = min(max_backoff, initial_backoff * 2 ** (attempt - 1)) + print(f"Retry {attempt}, sleep {sleep}") + logger.debug(f"Retry {attempt}, sleep {sleep}") await asyncio.sleep(sleep, loop=client.loop) result = await coroutine @@ -187,11 +189,20 @@ async def _retry_handler(client, coroutine, max_retries, initial_backoff, else: lazy_exception = result[0] - for fail_data in result[1]: - for _ in fail_data: - bulk_data.append(_) - if result[1]: - bulk_action.extend(map(client.transport.serializer.dumps, result[1])) + bulk_data = result[1] + + for tuple_data in bulk_data: + + data = action = None + if len(tuple_data) == 2: + data = tuple_data[1] + action = tuple_data[0] + + action = client.transport.serializer.dumps(action) + bulk_action.append(action) + if data is not None: + data = client.transport.serializer.dumps(data) + bulk_action.append(data) if not bulk_action or attempt == max_retries: break @@ -225,7 +236,8 @@ async def bulk(client, actions, chunk_size=500, max_retries=0, coroutine, max_retries, initial_backoff, - max_backoff, **kwargs) + max_backoff, + **kwargs) finish_count += count if stats_only: fail_datas += len(fails) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index 36b57392..803c2942 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -52,7 +52,7 @@ async def test_bulk_fails(es): '_type': 'type_3', '_id': "999"} ] - success, fails = await bulk(es, datas, stats_only=True,max_retries=5) + success, fails = await bulk(es, datas, stats_only=True, max_retries=1) assert success == 0 assert fails == 1 From 1df5ab10ec236cb92817f577db8aadf5be45af9e Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 11:20:55 +0800 Subject: [PATCH 17/19] support py35 string format --- aioelasticsearch/helpers.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/aioelasticsearch/helpers.py b/aioelasticsearch/helpers.py index 83cd981f..d627ab12 100644 --- a/aioelasticsearch/helpers.py +++ b/aioelasticsearch/helpers.py @@ -179,8 +179,7 @@ async def _retry_handler(client, coroutine, max_retries, initial_backoff, if attempt: sleep = min(max_backoff, initial_backoff * 2 ** (attempt - 1)) - print(f"Retry {attempt}, sleep {sleep}") - logger.debug(f"Retry {attempt}, sleep {sleep}") + logger.debug('Retry %d count, sleep %d second.', attempt, sleep) await asyncio.sleep(sleep, loop=client.loop) result = await coroutine @@ -192,8 +191,7 @@ async def _retry_handler(client, coroutine, max_retries, initial_backoff, bulk_data = result[1] for tuple_data in bulk_data: - - data = action = None + data = None if len(tuple_data) == 2: data = tuple_data[1] action = tuple_data[0] @@ -238,6 +236,7 @@ async def bulk(client, actions, chunk_size=500, max_retries=0, initial_backoff, max_backoff, **kwargs) + finish_count += count if stats_only: fail_datas += len(fails) From 24c2493c364c09a9547a8ebc44b4e7dd2f72cdc7 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 13:38:06 +0800 Subject: [PATCH 18/19] add retry_handler test --- tests/test_bulk.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/test_bulk.py b/tests/test_bulk.py index 803c2942..f558ac62 100644 --- a/tests/test_bulk.py +++ b/tests/test_bulk.py @@ -4,7 +4,7 @@ import pytest -from aioelasticsearch.helpers import bulk, concurrency_bulk +from aioelasticsearch.helpers import bulk, concurrency_bulk, _retry_handler from aioelasticsearch import Elasticsearch, TransportError logger = logging.getLogger('elasticsearch') @@ -79,3 +79,22 @@ async def test_bulk_raise_exception(loop): ] with pytest.raises(TransportError): success, fails = await bulk(es, datas, stats_only=True) + + +@pytest.mark.run_loop +async def test_retry_handler(es): + async def mock_data(): + # finish_count, [( es_action, source_data ), ... ] + return 0, [( + {'index': {'_index': 'test_aioes', '_type': 'test_aioes', '_id': 100}}, + {'name': 'Karl 1', 'email': 'karl@example.com'}), + ({'index': {'_index': 'test_aioes', '_type': 'test_aioes','_id': 101}}, + {'name': 'Karl 2', 'email': 'karl@example.com'})] + + done, fail = await _retry_handler(es, + mock_data(), + max_retries=1, + initial_backoff=2, + max_backoff=600) + assert done == 2 + assert fail == [] From 2b561e72ac49c50d6dbb7b2388cdd13ba8dc5265 Mon Sep 17 00:00:00 2001 From: rainjay Date: Sun, 8 Apr 2018 14:05:08 +0800 Subject: [PATCH 19/19] update readme --- README.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 13aee3b1..5ca0652b 100644 --- a/README.rst +++ b/README.rst @@ -98,8 +98,7 @@ Asynchronous `bulk