Skip to content

Commit

Permalink
creating dockerfile to test sdx-lc on sdx-continuous-development #100
Browse files Browse the repository at this point in the history
  • Loading branch information
lmarinve committed Nov 8, 2023
1 parent dcca760 commit 37c586f
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 34 deletions.
14 changes: 11 additions & 3 deletions swagger_server/controllers/topology_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
logging.getLogger("pika").setLevel(logging.WARNING)

MANIFEST = os.environ.get("MANIFEST")
SDXLC_DOMAIN = os.environ.get("SDXLC_DOMAIN")
OXPO_ID = int(os.environ.get("OXPO_ID"))
SDX_LC_DOMAINS = os.environ.get("SDXLC_DOMAIN")
SDXLC_DOMAIN = SDX_LC_DOMAINS.split(",")[OXPO_ID]

# Get DB connection and tables set up.
db_instance = DbUtils()
Expand Down Expand Up @@ -53,7 +55,10 @@ def add_topology(body): # noqa: E501
if msg_id is None:
return "ID is missing."

logger.info(f"msg_id: {msg_id}")
logger.info(f"SDX_LC domain: {SDXLC_DOMAIN}")
domain_name = find_between(msg_id, "topology:", ".net")
logger.info(f"domain name: {domain_name}")
if domain_name != SDXLC_DOMAIN:
logger.debug("Domain name not matching LC domain. Returning 400 status.")
return "Domain name not matching LC domain. Please check again.", 400
Expand All @@ -66,15 +71,18 @@ def add_topology(body): # noqa: E501
db_instance.add_key_value_pair_to_db("latest_topology", json_body)
logger.debug("Saving to database complete.")

logger.debug("Publishing Message to MQ: {}".format(body))
# logger.info("Publishing Message to MQ: {}".format(body))

# initiate rpc producer with 5 seconds timeout
rpc = RpcProducer(5, "", "topo")
response = rpc.call(json_body)
# Signal to end keep alive pings.
rpc.stop()

return str(response)
logger.info(f"sdx lc response: {response}")
logger.info(f"json_body: {json_body}")
# return str(response)
return json_body


def delete_topology(topology_id, api_key=None): # noqa: E501
Expand Down
17 changes: 7 additions & 10 deletions swagger_server/messaging/rpc_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

# hardcode for testing
MQ_HOST = "aw-sdx-monitor.renci.org"
MQ_SRVC = os.environ.get("MQ_SRVC")
MQ_USER = os.environ.get("MQ_USER")
MQ_PASS = os.environ.get("MQ_PASS")
SUB_QUEUE = "connection"
SUB_TOPIC = "lc1_q1"
SUB_EXCHANGE = "connection"
Expand All @@ -23,17 +26,14 @@
class RpcConsumer(object):
def __init__(self, thread_queue, exchange_name):
self.logger = logging.getLogger(__name__)
SLEEP_TIME = 60
SLEEP_TIME = 30
self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME)
time.sleep(SLEEP_TIME)

self.logger.info(' [*] Connecting to server ...')
credentials = pika.PlainCredentials('mq_user', 'mq_pwd')
credentials = pika.PlainCredentials(MQ_USER, MQ_PASS)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq3', 5672, '/', credentials))
# self.connection = pika.BlockingConnection(
# pika.ConnectionParameters(host=MQ_HOST)
# )
pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials))

self.channel = self.connection.channel()
self.exchange_name = exchange_name
Expand All @@ -52,17 +52,14 @@ def on_request(self, ch, method, props, message_body):
response = message_body
self._thread_queue.put(message_body)

SLEEP_TIME = 60
SLEEP_TIME = 30
self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME)
time.sleep(SLEEP_TIME)

self.logger.info(' [*] Connecting to server ...')
credentials = pika.PlainCredentials('mq_user', 'mq_pwd')
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq3', 5672, '/', credentials))
# self.connection = pika.BlockingConnection(
# pika.ConnectionParameters(host=MQ_HOST)
# )
self.channel = self.connection.channel()

ch.basic_publish(
Expand Down
15 changes: 12 additions & 3 deletions swagger_server/messaging/rpc_queue_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@
import pika

MQ_HOST = os.environ.get("MQ_HOST")

MQ_SRVC = os.environ.get("MQ_SRVC")
MQ_USER = os.environ.get("MQ_USER")
MQ_PASS = os.environ.get("MQ_PASS")

class RpcProducer(object):
def __init__(self, timeout, exchange_name, routing_key):
self.logger = logging.getLogger(__name__)

SLEEP_TIME = 5
self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME)
time.sleep(SLEEP_TIME)

self.logger.info(' [*] Connecting to server ...')
credentials = pika.PlainCredentials(MQ_USER, MQ_PASS)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=MQ_HOST)
)
pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials))


self.channel = self.connection.channel()
self.timeout = timeout
Expand Down
27 changes: 12 additions & 15 deletions swagger_server/messaging/topic_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
from swagger_server.utils.db_utils import DbUtils

MQ_HOST = os.environ.get("MQ_HOST")
MQ_SRVC = os.environ.get("MQ_SRVC")
MQ_USER = os.environ.get("MQ_USER")
MQ_PASS = os.environ.get("MQ_PASS")
# subscribe to the corresponding queue
SUB_QUEUE = os.environ.get("SUB_QUEUE")
SUB_TOPIC = os.environ.get("SUB_TOPIC")
SUB_EXCHANGE = os.environ.get("SUB_EXCHANGE")
KYTOS_URL = os.environ.get("KYTOS_URL")
KYTOS_PROVISION = os.environ.get("KYTOS_PROVISION")


def is_json(myjson):
Expand All @@ -32,17 +35,14 @@ def __init__(self, thread_queue, exchange_name):
self.logger = logging.getLogger(__name__)


SLEEP_TIME = 60
SLEEP_TIME = 30
self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME)
time.sleep(SLEEP_TIME)

self.logger.info(' [*] Connecting to server ...')
credentials = pika.PlainCredentials('mq_user', 'mq_pwd')
credentials = pika.PlainCredentials(MQ_USER, MQ_PASS)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq3', 5672, '/', credentials))
# self.connection = pika.BlockingConnection(
# pika.ConnectionParameters(host=MQ_HOST)
# )
pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials))

self.channel = self.connection.channel()
self.exchange_name = exchange_name
Expand All @@ -64,17 +64,14 @@ def on_rpc_request(self, ch, method, props, message_body):
response = message_body
self._thread_queue.put(message_body)

SLEEP_TIME = 60
SLEEP_TIME = 30
self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME)
time.sleep(SLEEP_TIME)

self.logger.info(' [*] Connecting to server ...')
credentials = pika.PlainCredentials('mq_user', 'mq_pwd')
credentials = pika.PlainCredentials(MQ_USER, MQ_PASS)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq3', 5672, '/', credentials))
# self.connection = pika.BlockingConnection(
# pika.ConnectionParameters(host=MQ_HOST)
# )
pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials))

self.channel = self.connection.channel()

Expand Down Expand Up @@ -111,10 +108,10 @@ def handle_mq_msg(self, msg_body):
self.logger.info("Sending connection info to Kytos.")
# Uncomment lines below to send connection info to Kytos
try:
r = requests.post(str(KYTOS_URL), json=msg_json)
r = requests.post(str(KYTOS_PROVISION), json=msg_json)
self.logger.info(f"Status from Kytos: {r}")
except Exception as e:
self.logger.error(f"Error on POST to {KYTOS_URL}: {e}")
self.logger.error(f"Error on POST to {KYTOS_PROVISION}: {e}")
self.logger.info(
"Check your configuration and make sure kytos is running."
)
Expand Down
13 changes: 10 additions & 3 deletions swagger_server/messaging/topic_queue_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@

# hardcode for testing
MQ_HOST = "aw-sdx-monitor.renci.org"

MQ_SRVC = os.environ.get("MQ_SRVC")
MQ_USER = os.environ.get("MQ_USER")
MQ_PASS = os.environ.get("MQ_PASS")

class TopicQueueProducer(object):
def __init__(self, timeout, exchange_name, routing_key):
self.logger = logging.getLogger(__name__)
SLEEP_TIME = 30
self.logger.info(' [*] Sleeping for %s seconds.', SLEEP_TIME)
time.sleep(SLEEP_TIME)

self.logger.info(' [*] Connecting to server ...')
credentials = pika.PlainCredentials(MQ_USER, MQ_PASS)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=MQ_HOST)
)
pika.ConnectionParameters(MQ_SRVC, 5672, '/', credentials))

self.channel = self.connection.channel()
self.timeout = timeout
Expand Down

0 comments on commit 37c586f

Please sign in to comment.