diff --git a/swagger_server/controllers/topology_controller.py b/swagger_server/controllers/topology_controller.py index 4c867fd..948b497 100644 --- a/swagger_server/controllers/topology_controller.py +++ b/swagger_server/controllers/topology_controller.py @@ -20,7 +20,9 @@ logging.getLogger("pika").setLevel(logging.WARNING) MANIFEST = os.environ.get("MANIFEST") -SDXLC_DOMAIN = os.environ.get("SDXLC_DOMAIN") +OXPO_ID = int(os.environ.get("OXPO_ID")) +SDX_LC_DOMAINS = os.environ.get("SDXLC_DOMAIN") +SDXLC_DOMAIN = SDX_LC_DOMAINS.split(",")[OXPO_ID] # Get DB connection and tables set up. db_instance = DbUtils() @@ -53,7 +55,10 @@ def add_topology(body): # noqa: E501 if msg_id is None: return "ID is missing." + logger.info(f"msg_id: {msg_id}") + logger.info(f"SDX_LC domain: {SDXLC_DOMAIN}") domain_name = find_between(msg_id, "topology:", ".net") + logger.info(f"domain name: {domain_name}") if domain_name != SDXLC_DOMAIN: logger.debug("Domain name not matching LC domain. Returning 400 status.") return "Domain name not matching LC domain. Please check again.", 400 @@ -66,7 +71,7 @@ def add_topology(body): # noqa: E501 db_instance.add_key_value_pair_to_db("latest_topology", json_body) logger.debug("Saving to database complete.") - logger.debug("Publishing Message to MQ: {}".format(body)) + # logger.info("Publishing Message to MQ: {}".format(body)) # initiate rpc producer with 5 seconds timeout rpc = RpcProducer(5, "", "topo") @@ -74,7 +79,10 @@ def add_topology(body): # noqa: E501 # Signal to end keep alive pings. rpc.stop() - return str(response) + logger.info(f"sdx lc response: {response}") + logger.info(f"json_body: {json_body}") + # return str(response) + return json_body def delete_topology(topology_id, api_key=None): # noqa: E501 diff --git a/swagger_server/messaging/rpc_queue_consumer.py b/swagger_server/messaging/rpc_queue_consumer.py index 7efb86b..bd61fb9 100644 --- a/swagger_server/messaging/rpc_queue_consumer.py +++ b/swagger_server/messaging/rpc_queue_consumer.py @@ -15,6 +15,9 @@ # hardcode for testing MQ_HOST = "aw-sdx-monitor.renci.org" +MQ_SRVC = os.environ.get("MQ_SRVC") +MQ_USER = os.environ.get("MQ_USER") +MQ_PASS = os.environ.get("MQ_PASS") SUB_QUEUE = "connection" SUB_TOPIC = "lc1_q1" SUB_EXCHANGE = "connection" @@ -23,17 +26,14 @@ class RpcConsumer(object): def __init__(self, thread_queue, exchange_name): self.logger = logging.getLogger(__name__) - SLEEP_TIME = 60 + SLEEP_TIME = 30 self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME) time.sleep(SLEEP_TIME) self.logger.info(' [*] Connecting to server ...') - credentials = pika.PlainCredentials('mq_user', 'mq_pwd') + credentials = pika.PlainCredentials(MQ_USER, MQ_PASS) self.connection = pika.BlockingConnection( - pika.ConnectionParameters('rabbitmq3', 5672, '/', credentials)) - # self.connection = pika.BlockingConnection( - # pika.ConnectionParameters(host=MQ_HOST) - # ) + pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials)) self.channel = self.connection.channel() self.exchange_name = exchange_name @@ -52,7 +52,7 @@ def on_request(self, ch, method, props, message_body): response = message_body self._thread_queue.put(message_body) - SLEEP_TIME = 60 + SLEEP_TIME = 30 self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME) time.sleep(SLEEP_TIME) @@ -60,9 +60,6 @@ def on_request(self, ch, method, props, message_body): credentials = pika.PlainCredentials('mq_user', 'mq_pwd') self.connection = pika.BlockingConnection( pika.ConnectionParameters('rabbitmq3', 5672, '/', credentials)) - # self.connection = pika.BlockingConnection( - # pika.ConnectionParameters(host=MQ_HOST) - # ) self.channel = self.connection.channel() ch.basic_publish( diff --git a/swagger_server/messaging/rpc_queue_producer.py b/swagger_server/messaging/rpc_queue_producer.py index b0f3cc9..a0cd309 100644 --- a/swagger_server/messaging/rpc_queue_producer.py +++ b/swagger_server/messaging/rpc_queue_producer.py @@ -8,14 +8,23 @@ import pika MQ_HOST = os.environ.get("MQ_HOST") - +MQ_SRVC = os.environ.get("MQ_SRVC") +MQ_USER = os.environ.get("MQ_USER") +MQ_PASS = os.environ.get("MQ_PASS") class RpcProducer(object): def __init__(self, timeout, exchange_name, routing_key): self.logger = logging.getLogger(__name__) + + SLEEP_TIME = 5 + self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME) + time.sleep(SLEEP_TIME) + + self.logger.info(' [*] Connecting to server ...') + credentials = pika.PlainCredentials(MQ_USER, MQ_PASS) self.connection = pika.BlockingConnection( - pika.ConnectionParameters(host=MQ_HOST) - ) + pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials)) + self.channel = self.connection.channel() self.timeout = timeout diff --git a/swagger_server/messaging/topic_queue_consumer.py b/swagger_server/messaging/topic_queue_consumer.py index b9e29cf..63b17dd 100644 --- a/swagger_server/messaging/topic_queue_consumer.py +++ b/swagger_server/messaging/topic_queue_consumer.py @@ -12,11 +12,14 @@ from swagger_server.utils.db_utils import DbUtils MQ_HOST = os.environ.get("MQ_HOST") +MQ_SRVC = os.environ.get("MQ_SRVC") +MQ_USER = os.environ.get("MQ_USER") +MQ_PASS = os.environ.get("MQ_PASS") # subscribe to the corresponding queue SUB_QUEUE = os.environ.get("SUB_QUEUE") SUB_TOPIC = os.environ.get("SUB_TOPIC") SUB_EXCHANGE = os.environ.get("SUB_EXCHANGE") -KYTOS_URL = os.environ.get("KYTOS_URL") +KYTOS_PROVISION = os.environ.get("KYTOS_PROVISION") def is_json(myjson): @@ -32,17 +35,14 @@ def __init__(self, thread_queue, exchange_name): self.logger = logging.getLogger(__name__) - SLEEP_TIME = 60 + SLEEP_TIME = 30 self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME) time.sleep(SLEEP_TIME) self.logger.info(' [*] Connecting to server ...') - credentials = pika.PlainCredentials('mq_user', 'mq_pwd') + credentials = pika.PlainCredentials(MQ_USER, MQ_PASS) self.connection = pika.BlockingConnection( - pika.ConnectionParameters('rabbitmq3', 5672, '/', credentials)) - # self.connection = pika.BlockingConnection( - # pika.ConnectionParameters(host=MQ_HOST) - # ) + pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials)) self.channel = self.connection.channel() self.exchange_name = exchange_name @@ -64,17 +64,14 @@ def on_rpc_request(self, ch, method, props, message_body): response = message_body self._thread_queue.put(message_body) - SLEEP_TIME = 60 + SLEEP_TIME = 30 self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME) time.sleep(SLEEP_TIME) self.logger.info(' [*] Connecting to server ...') - credentials = pika.PlainCredentials('mq_user', 'mq_pwd') + credentials = pika.PlainCredentials(MQ_USER, MQ_PASS) self.connection = pika.BlockingConnection( - pika.ConnectionParameters('rabbitmq3', 5672, '/', credentials)) - # self.connection = pika.BlockingConnection( - # pika.ConnectionParameters(host=MQ_HOST) - # ) + pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials)) self.channel = self.connection.channel() @@ -111,10 +108,10 @@ def handle_mq_msg(self, msg_body): self.logger.info("Sending connection info to Kytos.") # Uncomment lines below to send connection info to Kytos try: - r = requests.post(str(KYTOS_URL), json=msg_json) + r = requests.post(str(KYTOS_PROVISION), json=msg_json) self.logger.info(f"Status from Kytos: {r}") except Exception as e: - self.logger.error(f"Error on POST to {KYTOS_URL}: {e}") + self.logger.error(f"Error on POST to {KYTOS_PROVISION}: {e}") self.logger.info( "Check your configuration and make sure kytos is running." ) diff --git a/swagger_server/messaging/topic_queue_publisher.py b/swagger_server/messaging/topic_queue_publisher.py index 760850e..d18565a 100644 --- a/swagger_server/messaging/topic_queue_publisher.py +++ b/swagger_server/messaging/topic_queue_publisher.py @@ -11,14 +11,21 @@ # hardcode for testing MQ_HOST = "aw-sdx-monitor.renci.org" - +MQ_SRVC = os.environ.get("MQ_SRVC") +MQ_USER = os.environ.get("MQ_USER") +MQ_PASS = os.environ.get("MQ_PASS") class TopicQueueProducer(object): def __init__(self, timeout, exchange_name, routing_key): self.logger = logging.getLogger(__name__) + SLEEP_TIME = 30 + self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME) + time.sleep(SLEEP_TIME) + + self.logger.info(' [*] Connecting to server ...') + credentials = pika.PlainCredentials(MQ_USER, MQ_PASS) self.connection = pika.BlockingConnection( - pika.ConnectionParameters(host=MQ_HOST) - ) + pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials)) self.channel = self.connection.channel() self.timeout = timeout