From 0a42f4d4545e3557f0b8aada5d16dd8fb841dddd Mon Sep 17 00:00:00 2001 From: Yangxulight Date: Fri, 8 Nov 2019 16:14:41 +0800 Subject: [PATCH 1/3] add throughput summary and fix timmer blocked by end=3 count=0 settings --- .../eventgen_controller_api.py | 37 +++++++++++++------ splunk_eventgen/lib/eventgentimer.py | 1 + 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py b/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py index 4b060df1..1d5a20e1 100644 --- a/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py +++ b/splunk_eventgen/eventgen_api_server/eventgen_controller_api.py @@ -17,20 +17,20 @@ def __init__(self, redis_connector, host): self.interval = 0.001 self.server_responses = {} - + def get_blueprint(self): return self.bp - + def __create_blueprint(self): bp = Blueprint('api', __name__) - + def publish_message(job, request_method, body=None, target="all"): message_uuid = str(uuid.uuid4()) formatted_message = json.dumps({'job': job, 'target': target, 'body': body, 'request_method': request_method, 'message_uuid': message_uuid}) self.redis_connector.message_connection.publish(self.redis_connector.servers_channel, formatted_message) self.logger.info("Published {}".format(formatted_message)) return message_uuid - + def gather_response(target_job, message_uuid, response_number_target=0): if not response_number_target: response_number_target = int(self.redis_connector.message_connection.pubsub_numsub(self.redis_connector.servers_channel)[0][1]) @@ -55,6 +55,8 @@ def gather_response(target_job, message_uuid, response_number_target=0): if response_message_uuid not in self.server_responses: self.server_responses[response_message_uuid] = {} self.server_responses[response_message_uuid][server_response['host']] = server_response['response'] + if target_job == 'status': + self.server_responses[message_uuid] = self.calculate_throughput(data=self.server_responses[message_uuid]) return self.server_responses.get(message_uuid, {}) @bp.route('/index', methods=['GET']) @@ -65,7 +67,7 @@ def index(): You are running Eventgen Controller.\n''' host = self.host return home_page.format(host, self.redis_connector.get_registered_servers()) - + @bp.route('/status', methods=['GET'], defaults={'target': 'all'}) @bp.route('/status/', methods=['GET']) def http_status(target): @@ -96,7 +98,7 @@ def http_bundle(target): except Exception as e: self.logger.error(e) return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500) - + @bp.route('/setup', methods=['POST'], defaults={'target': 'all'}) @bp.route('/setup/', methods=['POST']) def http_setup(target): @@ -106,7 +108,7 @@ def http_setup(target): except Exception as e: self.logger.error(e) return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500) - + @bp.route('/volume', methods=['GET', 'POST'], defaults={'target': 'all'}) @bp.route('/volume/', methods=['GET', 'POST']) def http_volume(target): @@ -117,7 +119,7 @@ def http_volume(target): except Exception as e: self.logger.error(e) return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500) - + @bp.route('/start', methods=['POST'], defaults={'target': 'all'}) @bp.route('/start/', methods=['POST']) def http_start(target): @@ -137,7 +139,7 @@ def http_stop(target): except Exception as e: self.logger.error(e) return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500) - + @bp.route('/restart', methods=['POST'], defaults={'target': 'all'}) @bp.route('/restart/', methods=['POST']) def http_restart(target): @@ -147,7 +149,7 @@ def http_restart(target): except Exception as e: self.logger.error(e) return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500) - + @bp.route('/reset', methods=['POST'], defaults={'target': 'all'}) @bp.route('/reset/', methods=['POST']) def http_reset(target): @@ -176,8 +178,21 @@ def http_healthcheck(target): except Exception as e: self.logger.error(e) return Response(INTERNAL_ERROR_RESPONSE, mimetype='application/json', status=500) - + return bp + def calculate_throughput(self, data): + throughput_summary = {'TOTAL_VOLUME_MB': 0, 'TOTAL_COUNT': 0, 'THROUGHPUT_VOLUME_KB': 0, 'THROUGHPUT_COUNT': 0} + for server_name, server_status in data.items(): + if server_name != 'time' and 'THROUGHPUT_STATUS' in server_status: + server_throughput = server_status['THROUGHPUT_STATUS'] + throughput_summary['TOTAL_VOLUME_MB'] += server_throughput['TOTAL_VOLUME_MB'] + throughput_summary['TOTAL_COUNT'] += server_throughput['TOTAL_COUNT'] + throughput_summary['THROUGHPUT_VOLUME_KB'] += server_throughput['THROUGHPUT_VOLUME_KB'] + throughput_summary['THROUGHPUT_COUNT'] += server_throughput['THROUGHPUT_COUNT'] + data['THROUGHTPUT_SUMMARY'] = throughput_summary + self.logger.debug("throughput summary: {}".format(throughput_summary)) + return data + def __make_error_response(self, status, message): return Response(json.dumps({'message': message}), status=status) diff --git a/splunk_eventgen/lib/eventgentimer.py b/splunk_eventgen/lib/eventgentimer.py index d374ee81..692459e4 100644 --- a/splunk_eventgen/lib/eventgentimer.py +++ b/splunk_eventgen/lib/eventgentimer.py @@ -176,6 +176,7 @@ def real_run(self): logger.info( "There is no data to be generated in worker {0} because the count is {1}.".format( self.sample.config.generatorWorkers, count)) + self.executions += 1 else: # Spawn workers at the beginning of job rather than wait for next interval logger.info("Starting '%d' generatorWorkers for sample '%s'" % From 0a1bd0b26de55be7758067fe3f3000ecf1d4fea1 Mon Sep 17 00:00:00 2001 From: Yangxulight Date: Mon, 2 Dec 2019 17:08:51 +0800 Subject: [PATCH 2/3] add throughput function test --- tests/large/test_eventgen_orchestration.py | 28 ++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/tests/large/test_eventgen_orchestration.py b/tests/large/test_eventgen_orchestration.py index ba67e05b..a3ce484a 100644 --- a/tests/large/test_eventgen_orchestration.py +++ b/tests/large/test_eventgen_orchestration.py @@ -72,7 +72,7 @@ def setup_class(cls): print('creating server') redis_host = container["Id"][:12] container = cls.client.create_container( - image=IMAGE_NAME, command="server", environment=["REDIS_HOST={}".format(redis_host)], + image=IMAGE_NAME, command="server", environment=["REDIS_HOST={}".format(redis_host)], host_config=host_config, networking_config=networking_config) cls.client.start(container["Id"]) @@ -212,7 +212,7 @@ def test_controller_set_volume_with_volume_and_target(self): assert r.status_code == 200 output = json.loads(r.content) assert output[TestEventgenOrchestration.server_id[:12]]["perDayVolume"] == 20 - + def test_controller_stop(self): r = requests.post("http://127.0.0.1:{}/stop".format(self.controller_eventgen_webport)) assert r.status_code == 200 @@ -308,3 +308,27 @@ def test_server_get_and_set_volume(self): assert r.status_code == 200 output = json.loads(r.content) assert output["perDayVolume"] == 150.0 + + def test_server_get_throughput(self): + r = requests.put("http://127.0.0.1:{}/conf".format(self.server_eventgen_webport), json={"windbag": { 'count' : 1000, "end":1}, "general": {"outputCounter": True}}) + assert r.status_code == 200 + assert json.loads(r.content) + r = requests.post("http://127.0.0.1:{}/start".format(self.server_eventgen_webport), timeout=5) + assert r.status_code == 200 + timeout = 60 + while True: + r = requests.get("http://127.0.0.1:{}/status".format(self.server_eventgen_webport)) + assert r.status_code == 200 + output = json.loads(r.content) + assert output + if output['EVENTGEN_STATUS'] == 1: + time.sleep(5) + timeout -= 5 + assert timeout >= 0 + r = requests.get("http://127.0.0.1:{}/status".format(self.server_eventgen_webport)) + assert r.status_code == 200 + output = json.loads(r.content) + assert output + assert output['THROUGHTPUT_SUMMARY']['TOTAL_COUNT'] == 1000 + assert output['THROUGHTPUT_SUMMARY']['TOTAL_VOLUME_MB'] > 0 + From ec9192c47d1742dec36c9d86446b8f811b81145c Mon Sep 17 00:00:00 2001 From: Yangxulight Date: Mon, 2 Dec 2019 17:27:57 +0800 Subject: [PATCH 3/3] add completed status check code --- tests/large/test_eventgen_orchestration.py | 32 ++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/tests/large/test_eventgen_orchestration.py b/tests/large/test_eventgen_orchestration.py index a3ce484a..7122bd02 100644 --- a/tests/large/test_eventgen_orchestration.py +++ b/tests/large/test_eventgen_orchestration.py @@ -136,7 +136,7 @@ def test_controller_status(self): current_retry += 1 time.sleep(10) assert output - + def test_controller_conf(self): r = requests.post("http://127.0.0.1:{}/conf".format(self.controller_eventgen_webport), json=self.test_json) assert r.status_code == 200 @@ -255,6 +255,32 @@ def test_server_status(self): assert output assert output['EVENTGEN_STATUS'] == 0 assert output['TOTAL_VOLUME'] == 20 + # check if eventgen status changed when it finished + config_json = { + "windbag": { + "end": "3", + "count": "0" + } + } + r = requests.post("http://127.0.0.1:{}/conf".format(self.server_eventgen_webport), json=config_json) + assert r.status_code == 200 + assert json.loads(r.content) == config_json + r = requests.post("http://127.0.0.1:{}/start".format(self.server_eventgen_webport), timeout=5) + assert r.status_code == 200 + timeout = 60 + while True: + r = requests.get("http://127.0.0.1:{}/status".format(self.server_eventgen_webport)) + assert r.status_code == 200 + output = json.loads(r.content) + assert output + if output['EVENTGEN_STATUS'] == 1: + time.sleep(5) + timeout -= 5 + if timeout < 0: + break + assert timeout >= 0 + assert output['EVENTGEN_STATUS'] == 2 + def test_server_get_and_set_conf(self): r = requests.get("http://127.0.0.1:{}/conf".format(self.server_eventgen_webport)) @@ -324,7 +350,9 @@ def test_server_get_throughput(self): if output['EVENTGEN_STATUS'] == 1: time.sleep(5) timeout -= 5 - assert timeout >= 0 + if timeout < 0: + break + assert timeout >= 0 r = requests.get("http://127.0.0.1:{}/status".format(self.server_eventgen_webport)) assert r.status_code == 200 output = json.loads(r.content)