From 6db625dc533e986c6b184ee2e04e92f72727e56f Mon Sep 17 00:00:00 2001 From: James Green Date: Wed, 29 May 2024 11:01:51 +0100 Subject: [PATCH 1/2] add ability to assign APs --- example.env | 1 + example_ap_list.csv | 2 + src/build_juniper_payload.py | 28 ++- src/juniper_ap_staging.py | 205 ++++++++++++++++++ src/juniper_client.py | 18 ++ src/{juniper.py => juniper_site_creation.py} | 64 +++--- src/main.py | 65 +++--- test/test_build_juniper_payload.py | 38 +++- test/test_juniper_ap_staging.py | 112 ++++++++++ test/test_juniper_client.py | 1 - ...niper.py => test_juniper_site_creation.py} | 123 +++++------ test/test_main.py | 46 +--- 12 files changed, 519 insertions(+), 184 deletions(-) create mode 100644 example_ap_list.csv create mode 100644 src/juniper_ap_staging.py rename src/{juniper.py => juniper_site_creation.py} (81%) create mode 100644 test/test_juniper_ap_staging.py rename test/{test_juniper.py => test_juniper_site_creation.py} (64%) diff --git a/example.env b/example.env index ea45c81..3da64d3 100644 --- a/example.env +++ b/example.env @@ -4,6 +4,7 @@ SITE_GROUP_IDS={"moj_wifi": "foo","gov_wifi": "bar"} RF_TEMPLATE_ID= NETWORK_TEMPLATE_ID= AP_VERSIONS={"AP45": "0.12.27139", "AP32": "0.12.27139"} +OPERATION=site_creation OR ap_staging ; Optional config options ;MIST_USERNAME= diff --git a/example_ap_list.csv b/example_ap_list.csv new file mode 100644 index 0000000..3af6eb6 --- /dev/null +++ b/example_ap_list.csv @@ -0,0 +1,2 @@ +SITE FITS ID,Site Name,Location,Device Type,Make/Model,Host Name,Serial Number,MAC Address, Claim Code, RF Report Reference +FITS_0982,HMP Norwich,"102PF Probation Test Site,not on floorplan",Access Point,AP45,MOJ-NS-1234-WAP001,A164622020622,a8:f7:d9:82:0d:23,A123456789,Blah Blah Blah \ No newline at end of file diff --git a/src/build_juniper_payload.py b/src/build_juniper_payload.py index 7081c02..e23309e 100644 --- a/src/build_juniper_payload.py +++ b/src/build_juniper_payload.py @@ -1,5 +1,6 @@ import json - +from datetime import datetime +import sys class BuildPayload: def __init__(self, @@ -220,3 +221,28 @@ def _build_site_payload( json_objects.append(site_dict) return json_objects + +def plan_of_action( + processed_payload +): + # Generate a timestamp for the filename + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + plan_file_name = "../data_src/mist_plan_{time}.json".format(time=timestamp) + + # Load file + with open(plan_file_name, "w") as plan_file: + json.dump(processed_payload, plan_file, indent=2) + + # Print to terminal + with open(plan_file_name, "r") as plan_file: + print(plan_file.read()) + + print("A file containing all the changes has been created: {file}".format( + file=plan_file_name)) + user_input = input("Do you wish to continue? (y/n): ").upper() + if user_input == "Y": + print("Continuing with run") + elif user_input == "N": + sys.exit(0) + else: + raise ValueError('Invalid input') \ No newline at end of file diff --git a/src/juniper_ap_staging.py b/src/juniper_ap_staging.py new file mode 100644 index 0000000..f199cd3 --- /dev/null +++ b/src/juniper_ap_staging.py @@ -0,0 +1,205 @@ +from juniper_client import JuniperClient +from build_juniper_payload import plan_of_action + + +def validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs: list['dict'], + ap_csv: list['dict']): + mist_site_names = [] + for mist_site_config in mist_sites_configs: + mist_site_names.append(mist_site_config['name']) + + mist_site_names_csv_file = [] + for ap_row in ap_csv: + mist_site_names_csv_file.append(ap_row['Site Name']) + + for site_name in mist_site_names_csv_file: + if site_name not in mist_site_names: + raise ValueError(f"Site name '{site_name}' found in CSV file but not in mist site configurations.") + + +def validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory: list['dict'], + ap_csv: list['dict']): + ap_mac_addresses_from_mist = [] + for inventory_item in mist_inventory: + if inventory_item['type'] == 'ap': + ap_mac_addresses_from_mist.append(inventory_item['mac']) + + ap_mac_addresses_from_csv = [] + for ap_row in ap_csv: + cleaned_mac_address = ap_row['MAC Address'].replace(":", "") + ap_mac_addresses_from_csv.append(cleaned_mac_address) + + for site_name in ap_mac_addresses_from_csv: + if site_name not in ap_mac_addresses_from_mist: + raise ValueError(f"Site name '{site_name}' found in CSV file but not in mist site configurations.") + + +def find_site_by_name(site_name: str, + site_and_mac_reservations: list): + for site in site_and_mac_reservations: + if site["SiteName"] == site_name: + return site + return None + + +def format_site_and_mac_reservations(ap_csv: list['dict'] + ) -> list['dict']: + site_and_mac_reservations = [] + + for ap_row in ap_csv: + site_name = ap_row.get("Site Name", "") + mac_address = ap_row.get("MAC Address", "").replace(":", "") + + # Find the existing site in the list + existing_site = find_site_by_name(site_name, site_and_mac_reservations) + + if existing_site: + # Update the mac_addresses field by appending the new MAC address + if mac_address not in existing_site["mac_addresses"]: + existing_site["mac_addresses"] += f",{mac_address}" + else: + # Create a new dictionary with the desired keys and values + new_item = { + "" + "SiteName": site_name, + "mac_addresses": mac_address + } + # Append the new dictionary to the output list + site_and_mac_reservations.append(new_item) + + return site_and_mac_reservations + + +def find_site_id( + site_and_mac_reservations_without_mist_site_ids: list['dict'], + mist_sites_configs: list['dict'] + ) -> list['dict']: + + for site_csv in site_and_mac_reservations_without_mist_site_ids: + site_found = False + for site_mist in mist_sites_configs: + if site_csv['SiteName'] == site_mist['name']: + site_csv['site_id'] = site_mist['id'] + site_found = True + break + if not site_found: + raise ValueError(f"No matching site_id found for site: {site_csv['SiteName']}") + + return site_and_mac_reservations_without_mist_site_ids + + +def get_site_payload_for_inventory_assign_to_site( + site_and_mac_reservations_including_mist_site_ids: list['dict'] + ) -> list['dict']: + payload = [] + + for site in site_and_mac_reservations_including_mist_site_ids: + payload.append( + { + "op": "assign", + "site_id": site['site_id'], + "macs": [ + site['mac_addresses'] + ], + "no_reassign": False, + "disable_auto_config": False, + "managed": False + } + ) + + return payload + +def rename_ap(admin: object, + site_id: str, + ap_id: str, + new_ap_name: str): + api_url = '/api/v1/sites/' + site_id + '/devices/' + ap_id + response = admin.put(api_url, payload={'name': new_ap_name}) + if response == False: + print('Failed to rename ap to new desired name {}'.format(new_ap_name)) + else: + print('Successfully renamed ap to new desired name {}'.format(new_ap_name)) + +def build_rename_ap_payload( + inventory_payloads: list['dict'], + mist_inventory: list['dict'], + ap_csv: list['dict'] + ) -> list['dict']: + payload = [] + + for site in inventory_payloads: + for mac in site['macs']: + inventory_item = find_inventory_item_by_mac_address(mist_inventory, mac) + csv_item = find_csv_item_by_mac_address(ap_csv, mac) + payload.append( + { + "new_ap_name": csv_item['Host Name'], + "site_id": site['site_id'], + "ap_id": inventory_item['id'], + } + ) + + return payload + +def find_inventory_item_by_mac_address( + mist_inventory: list['dict'], + mac: str + ) -> dict: + for item in mist_inventory: + if item['mac'] == mac: + return item + raise ValueError(f"Unable to find item with Mac Address: '{mac}'") + +def find_csv_item_by_mac_address( + ap_csv: list['dict'], + mac: str + ) -> list[dict]: + for item in ap_csv: + if item['MAC Address'].replace(":", "") == mac: + return item + raise ValueError(f"Unable to find item name with Mac Address: '{mac}' in csv") + + +def juniper_ap_assign( + ap_csv: list['dict'], + org_id=None, + mist_username=None, + mist_login_method=None +): + admin = JuniperClient(mist_username, mist_login_method) + mist_sites_configs = admin.get('/api/v1/orgs/' + org_id + '/sites') + mist_inventory = admin.get('/api/v1/orgs/' + org_id + '/inventory') + + validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) + validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv) + + site_and_mac_reservations_without_mist_site_ids = format_site_and_mac_reservations( + ap_csv + ) + + site_and_mac_reservations_including_mist_site_ids = find_site_id( + site_and_mac_reservations_without_mist_site_ids, + mist_sites_configs + ) + + inventory_payloads = get_site_payload_for_inventory_assign_to_site(site_and_mac_reservations_including_mist_site_ids) + + plan_of_action(inventory_payloads) + + for site_payload in inventory_payloads: + result = admin.put('/api/v1/orgs/' + org_id + '/inventory', site_payload) + if result == False: + print('Failed to assign ap {}'.format(site_payload)) + else: + print('Successfully assigned ap {}'.format(site_payload)) + + rename_ap_payload = build_rename_ap_payload(inventory_payloads,mist_inventory, ap_csv) + plan_of_action(rename_ap_payload) + + for ap_item in rename_ap_payload: + rename_ap( + admin, + ap_item['site_id'], + ap_item['ap_id'], + ap_item['new_ap_name'] + ) \ No newline at end of file diff --git a/src/juniper_client.py b/src/juniper_client.py index bc563d1..a3c9507 100644 --- a/src/juniper_client.py +++ b/src/juniper_client.py @@ -90,3 +90,21 @@ def put(self, url, payload): return False return json.loads(response.text) + + def get(self, url): + url = 'https://api.eu.mist.com{}'.format(url) + session = self.session + headers = self.headers + + print('GET {}'.format(url)) + response = session.get(url, headers=headers) + + if response.status_code != 200: + print('Failed to GET') + print('\tURL: {}'.format(url)) + print('\tResponse: {} ({})'.format( + response.text, response.status_code)) + return False + + return json.loads(response.text) + diff --git a/src/juniper.py b/src/juniper_site_creation.py similarity index 81% rename from src/juniper.py rename to src/juniper_site_creation.py index 2bfec1d..7859346 100644 --- a/src/juniper.py +++ b/src/juniper_site_creation.py @@ -1,10 +1,30 @@ import json import sys -from datetime import datetime -from build_juniper_payload import BuildPayload +from build_juniper_payload import BuildPayload, plan_of_action from juniper_client import JuniperClient +from geocode import geocode, find_timezone, find_country_code +import time +def add_geocoding_to_json(data): + for d in data: + # Variables + site_id = None + site = {'name': d.get('Site Name', ''), + 'address': d.get('Site Name', '') + } + + gps = geocode(d.get('Site Address', '')) + country_code = find_country_code(gps) + time_zone = find_timezone(gps) + + # Adding new key-value pairs to the dictionary + d['gps'] = gps + d['country_code'] = country_code + d['time_zone'] = time_zone + time.sleep(1) + return data + def warn_if_using_org_id_production(org_id): production_org_ids = [ '3e824dd6-6b37-4cc7-90bb-97d744e91175', @@ -23,36 +43,6 @@ def warn_if_using_org_id_production(org_id): raise ValueError('Invalid input') -def plan_of_action( - payload_processor -): - - # Generate a timestamp for the filename - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - plan_file_name = "../data_src/mist_plan_{time}.json".format(time=timestamp) - - processed_payload = payload_processor.get_site_payload() - - # Load file - with open(plan_file_name, "w") as plan_file: - json.dump(processed_payload, plan_file, indent=2) - - # Print to terminal - with open(plan_file_name, "r") as plan_file: - print(plan_file.read()) - - print("A file containing all the changes has been created: {file}".format( - file=plan_file_name)) - user_input = input("Do you wish to continue? (y/n): ").upper() - - if user_input == "Y": - print("Continuing with run") - elif user_input == "N": - sys.exit(0) - else: - raise ValueError('Invalid input') - - def validate_user_defined_config_variables( org_id, site_group_ids, @@ -79,8 +69,8 @@ def validate_user_defined_config_variables( # Main function -def juniper_script( - data, +def juniper_script_site_creation( + json_data_without_geocoding, org_id=None, mist_username=None, mist_login_method=None, @@ -89,6 +79,10 @@ def juniper_script( network_template_id=None, ap_versions=None ): + + data = add_geocoding_to_json( + json_data_without_geocoding) + # Configure True/False to enable/disable additional logging of the API response objects show_more_details = True @@ -112,7 +106,7 @@ def juniper_script( ) plan_of_action( - payload_processor + payload_processor.get_site_payload() ) # Establish Mist session diff --git a/src/main.py b/src/main.py index 0806ec2..6781ede 100644 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,7 @@ -from juniper import juniper_script +from juniper_site_creation import juniper_script_site_creation +from juniper_ap_staging import juniper_ap_assign import os import csv -from geocode import geocode, find_timezone, find_country_code # Convert CSV file to JSON object. @@ -24,42 +24,37 @@ def convert_csv_to_json(file_path): return csv_rows -def add_geocoding_to_json(data): - for d in data: - # Variables - site_id = None - site = {'name': d.get('Site Name', ''), - 'address': d.get('Site Name', '') - } - - gps = geocode(d.get('Site Address', '')) - country_code = find_country_code(gps) - time_zone = find_timezone(gps) - - # Adding new key-value pairs to the dictionary - d['gps'] = gps - d['country_code'] = country_code - d['time_zone'] = time_zone - return data - - if __name__ == '__main__': - csv_file_path = os.getcwd() + '/../data_src/sites_with_clients.csv' + csv_file_path = os.getcwd() + '/../data_src/' + os.environ.get('CSV_FILE_NAME') # Convert CSV to valid JSON - json_data_without_geocoding = convert_csv_to_json(csv_file_path) + json_data = convert_csv_to_json(csv_file_path) + + operation = os.environ.get('OPERATION') + + if operation is None: + raise EnvironmentError("The OPERATION environment variable is not set.") + + if operation == "site_creation": + juniper_script_site_creation( + mist_username=os.environ.get('MIST_USERNAME'), + mist_login_method=os.environ.get('MIST_LOGIN_METHOD'), + site_group_ids=os.environ.get('SITE_GROUP_IDS'), + org_id=os.environ.get('ORG_ID'), + rf_template_id=os.environ.get('RF_TEMPLATE_ID'), + json_data_without_geocoding=json_data, + network_template_id=os.environ.get('NETWORK_TEMPLATE_ID'), + ap_versions=os.environ.get('AP_VERSIONS') + ) + elif operation == "ap_staging": + juniper_ap_assign( + ap_csv=json_data, + mist_username=os.environ.get('MIST_USERNAME'), + mist_login_method=os.environ.get('MIST_LOGIN_METHOD'), + org_id=os.environ.get('ORG_ID') + ) + else: + raise ValueError(f"The OPERATION '{operation}' is not recognized.") - json_data_with_geocoding = add_geocoding_to_json( - json_data_without_geocoding) - juniper_script( - mist_username=os.environ.get('MIST_USERNAME'), - mist_login_method=os.environ.get('MIST_LOGIN_METHOD'), - site_group_ids=os.environ.get('SITE_GROUP_IDS'), - org_id=os.environ.get('ORG_ID'), - rf_template_id=os.environ.get('RF_TEMPLATE_ID'), - data=json_data_with_geocoding, - network_template_id=os.environ.get('NETWORK_TEMPLATE_ID'), - ap_versions=os.environ.get('AP_VERSIONS') - ) diff --git a/test/test_build_juniper_payload.py b/test/test_build_juniper_payload.py index 9dce68b..e8e57cc 100644 --- a/test/test_build_juniper_payload.py +++ b/test/test_build_juniper_payload.py @@ -1,7 +1,9 @@ -from src.build_juniper_payload import BuildPayload +from src.build_juniper_payload import BuildPayload, plan_of_action import unittest from unittest.mock import patch import json +from datetime import datetime +import os class TestBuildPayLoad(unittest.TestCase): @@ -253,3 +255,37 @@ def test_append_neither_wifi(self): result = self.payload_processor._check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups( gov_wifi, moj_wifi, self.site_group_ids) self.assertEqual(result, []) + +class TestPlanOfActionFunction(unittest.TestCase): + + + def test_plan_of_action_creates_file(self): + + json_payload = {"Hello": "hi"} + + with patch('builtins.input', return_value='Y'), patch('sys.exit') as mock_exit: + plan_of_action(json_payload) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + expected_file_name = f"../data_src/mist_plan_{timestamp}.json" + self.assertTrue(os.path.exists(expected_file_name)) + os.remove(expected_file_name) + + @patch("builtins.open") + def test_plan_of_action_exit_on_no(self, mock_open_file): + json_payload = {"Hello": "hi"} + + with patch('builtins.input', return_value='N'), self.assertRaises(SystemExit) as cm: + plan_of_action(json_payload) + self.assertEqual(cm.exception.code, 0) + + + @patch("builtins.open") + def test_plan_of_action_invalid_input(self, mock_open_file): + json_payload = {"Hello": "hi"} + + # Mocking the built-in input + with patch('builtins.input', return_value='invalid_input'): + with self.assertRaises(ValueError) as cm: + plan_of_action(json_payload) + + self.assertEqual(str(cm.exception), 'Invalid input') \ No newline at end of file diff --git a/test/test_juniper_ap_staging.py b/test/test_juniper_ap_staging.py new file mode 100644 index 0000000..762c94d --- /dev/null +++ b/test/test_juniper_ap_staging.py @@ -0,0 +1,112 @@ +import unittest + + +def validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs: list['dict'], + ap_csv: list['dict']): + mist_site_names = [] + for mist_site_config in mist_sites_configs: + mist_site_names.append(mist_site_config['name']) + + mist_site_names_csv_file = [] + for ap_row in ap_csv: + mist_site_names_csv_file.append(ap_row['Site Name']) + + for site_name in mist_site_names_csv_file: + if site_name not in mist_site_names: + raise ValueError(f"Site name '{site_name}' found in CSV file but not in mist site configurations.") + + +class TestValidateIfSiteDefinedOnCSVExistsInMist(unittest.TestCase): + + def test_all_sites_exist(self): + mist_sites_configs = [{'name': 'SiteA'}, {'name': 'SiteB'}, {'name': 'SiteC'}] + ap_csv = [{'Site Name': 'SiteA'}, {'Site Name': 'SiteB'}, {'Site Name': 'SiteC'}] + try: + validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) + except ValueError: + self.fail("validate_if_site_defined_on_csv_exists_in_mist() raised ValueError unexpectedly!") + + def test_site_not_exist(self): + mist_sites_configs = [{'name': 'SiteA'}, {'name': 'SiteB'}, {'name': 'SiteC'}] + ap_csv = [{'Site Name': 'SiteA'}, {'Site Name': 'SiteD'}] + with self.assertRaises(ValueError) as cm: + validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) + self.assertEqual(str(cm.exception), "Site name 'SiteD' found in CSV file but not in mist site configurations.") + + def test_empty_mist_sites_configs(self): + mist_sites_configs = [] + ap_csv = [{'Site Name': 'SiteA'}, {'Site Name': 'SiteB'}] + with self.assertRaises(ValueError) as cm: + validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) + self.assertEqual(str(cm.exception), "Site name 'SiteA' found in CSV file but not in mist site configurations.") + + def test_empty_ap_csv(self): + mist_sites_configs = [{'name': 'SiteA'}, {'name': 'SiteB'}] + ap_csv = [] + try: + validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) + except ValueError: + self.fail("validate_if_site_defined_on_csv_exists_in_mist() raised ValueError unexpectedly!") + + def test_both_lists_empty(self): + mist_sites_configs = [] + ap_csv = [] + try: + validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) + except ValueError: + self.fail("validate_if_site_defined_on_csv_exists_in_mist() raised ValueError unexpectedly!") + + +def validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv): + ap_mac_addresses_from_mist = [] + for inventory_item in mist_inventory: + if inventory_item['type'] == 'ap': + ap_mac_addresses_from_mist.append(inventory_item['mac']) + + ap_mac_addresses_from_csv = [] + for ap_row in ap_csv: + cleaned_mac_address = ap_row['MAC Address'].replace(":", "") + ap_mac_addresses_from_csv.append(cleaned_mac_address) + + for site_name in ap_mac_addresses_from_csv: + if site_name not in ap_mac_addresses_from_mist: + raise ValueError(f"Site name '{site_name}' found in CSV file but not in mist site configurations.") + + +class TestValidateAP(unittest.TestCase): + + def test_all_aps_exist(self): + mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}, {'type': 'ap', 'mac': '112233445566'}] + ap_csv = [{'MAC Address': 'aa:bb:cc:dd:ee:ff'}, {'MAC Address': '11:22:33:44:55:66'}] + try: + validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv) + except ValueError: + self.fail("validate_if_ap_defined_on_csv_exists_in_mist raised ValueError unexpectedly!") + + def test_missing_ap_in_mist(self): + mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}] + ap_csv = [{'MAC Address': 'aa:bb:cc:dd:ee:ff'}, {'MAC Address': '11:22:33:44:55:66'}] + with self.assertRaises(ValueError) as context: + validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv) + self.assertIn("Site name '112233445566' found in CSV file but not in mist site configurations.", + str(context.exception)) + + def test_empty_csv(self): + mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}, {'type': 'ap', 'mac': '112233445566'}] + ap_csv = [] + try: + validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv) + except ValueError: + self.fail("validate_if_ap_defined_on_csv_exists_in_mist raised ValueError unexpectedly!") + + def test_mixed_inventory(self): + mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}, {'type': 'switch', 'mac': '112233445566'}] + ap_csv = [{'MAC Address': 'aa:bb:cc:dd:ee:ff'}] + try: + validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv) + except ValueError: + self.fail("validate_if_ap_defined_on_csv_exists_in_mist raised ValueError unexpectedly!") + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_juniper_client.py b/test/test_juniper_client.py index 36b696f..5c0a716 100644 --- a/test/test_juniper_client.py +++ b/test/test_juniper_client.py @@ -2,7 +2,6 @@ from unittest.mock import patch, MagicMock from src.juniper_client import JuniperClient - class TestJuniperClient(unittest.TestCase): # Mocking the input function to provide a static MFA code diff --git a/test/test_juniper.py b/test/test_juniper_site_creation.py similarity index 64% rename from test/test_juniper.py rename to test/test_juniper_site_creation.py index 82c4b78..7bddb34 100644 --- a/test/test_juniper.py +++ b/test/test_juniper_site_creation.py @@ -1,18 +1,15 @@ import unittest from unittest.mock import patch, MagicMock -from src.juniper import juniper_script, \ - warn_if_using_org_id_production, plan_of_action +from src.juniper_site_creation import juniper_script_site_creation, \ + warn_if_using_org_id_production, add_geocoding_to_json from io import StringIO -from datetime import datetime -import os import pytest from parameterized import parameterized - class TestJuniperScript(unittest.TestCase): - @patch('src.juniper.plan_of_action') - @patch('src.juniper.JuniperClient') + @patch('src.juniper_site_creation.plan_of_action') + @patch('src.juniper_site_creation.JuniperClient') def test_given_successful_login_when_juniper_script_runs_post_a_site(self, mock_juniper_client, mock_plan_of_action): # Mock Mist API responses @@ -24,15 +21,14 @@ def test_given_successful_login_when_juniper_script_runs_post_a_site(self, mock_ # Sample input data data = [ - {'Site Name': 'TestSite', 'Site Address': '123 Main St', - 'gps': [1.23, 4.56], 'country_code': 'US', 'time_zone': 'UTC', + {'Site Name': 'TestSite', 'Site Address': 'Met Office, FitzRoy Road, Exeter, Devon, EX1 3PB', 'Enable GovWifi': 'true', 'Enable MoJWifi': 'false', 'Wired NACS Radius Key': 'key1', 'GovWifi Radius Key': 'key2'} ] # Call the function - juniper_script( - data, + juniper_script_site_creation( + json_data_without_geocoding=data, mist_login_method='token', org_id='your_org_id', site_group_ids='{"moj_wifi": "foo","gov_wifi": "bar"}', @@ -42,15 +38,16 @@ def test_given_successful_login_when_juniper_script_runs_post_a_site(self, mock_ ) mock_post.assert_called_once_with('/api/v1/orgs/your_org_id/sites', - {'name': 'TestSite', 'address': '123 Main St', - 'latlng': {'lat': 1.23, 'lng': 4.56}, 'country_code': 'US', - 'rftemplate_id': '8542a5fa-51e4-41be-83b9-acb416362cc0', - 'networktemplate_id': '46b87163-abd2-4b08-a67f-1ccecfcfd061', - 'timezone': 'UTC', 'sitegroup_ids': []}) + {'name': 'TestSite', 'address': 'Met Office, FitzRoy Road, Exeter, Devon, EX1 3PB', + 'latlng': {'lat': 50.727350349999995, 'lng': -3.4744726127760086}, 'country_code': 'GB', + 'rftemplate_id': '8542a5fa-51e4-41be-83b9-acb416362cc0', + 'networktemplate_id': '46b87163-abd2-4b08-a67f-1ccecfcfd061', 'timezone': 'Europe/London', + 'sitegroup_ids': []} + ) def test_juniper_script_missing_site_group_ids(self): with self.assertRaises(ValueError) as cm: - juniper_script([], org_id='your_org_id', mist_login_method='token') + juniper_script_site_creation([], org_id='your_org_id', mist_login_method='token') self.assertEqual(str(cm.exception), 'Must provide site_group_ids for GovWifi & MoJWifi') @@ -58,7 +55,7 @@ def test_juniper_script_missing_site_group_ids(self): def test_juniper_script_missing_rf_template_id(self): # Test when rf_template_id is missing with self.assertRaises(ValueError) as cm: - juniper_script([], + juniper_script_site_creation([], org_id='your_org_id', mist_login_method='token', site_group_ids={ @@ -108,7 +105,7 @@ def test_given_production_org_id_when_user_prompted_for_input_and_user_inputs_in def test_juniper_script_missing_network_template_id(self): # Test when network_template_id is missing with self.assertRaises(ValueError) as cm: - juniper_script([], + juniper_script_site_creation([], org_id='your_org_id', mist_login_method='token', site_group_ids={ @@ -119,12 +116,12 @@ def test_juniper_script_missing_network_template_id(self): self.assertEqual(str(cm.exception), 'Must define network_template_id') - @patch('src.juniper.plan_of_action') - @patch('src.juniper.JuniperClient') + @patch('src.juniper_site_creation.plan_of_action') + @patch('src.juniper_site_creation.JuniperClient') def test_given_mist_login_method_not_defined_then_default_to_credentials(self, mock_admin, mock_plan_of_action): output_buffer = StringIO() with patch('sys.stdout', new_callable=StringIO) as mock_stdout: - juniper_script([], + juniper_script_site_creation([], org_id='your_org_id', site_group_ids={ 'moj_wifi': '0b33c61d-8f51-4757-a14d-29263421a904', @@ -143,53 +140,47 @@ def test_given_mist_login_method_not_defined_then_default_to_credentials(self, m def test_juniper_script_missing_org_id(self): # Test when org_id is missing with self.assertRaises(ValueError) as cm: - juniper_script([], org_id=None) + juniper_script_site_creation([], org_id=None) self.assertEqual(str(cm.exception), 'Please provide Mist org_id') +class TestAddGeocodingToJson(unittest.TestCase): + + @patch('src.juniper_site_creation.geocode', side_effect=[ + {'latitude': 50.3868633, 'longitude': -4.1539256}, + {'latitude': 51.499929300000005, 'longitude': -0.13477761285315926}, + {'latitude': 50.727350349999995, 'longitude': -3.4744726127760086}, + ]) + @patch('src.juniper_site_creation.find_country_code', return_value='GB') + @patch('src.juniper_site_creation.find_timezone', return_value='Europe/London') + def test_given_site_name_and_site_address_in_json_format_when_function_called_then_add_geocode_country_code_and_time_zone( + self, + find_timezone, + mock_find_country_code, + mock_geocode + ): + # Test if the function adds geocoding information correctly + data = [ + {'Site Name': 'Site1', 'Site Address': '40 Mayflower Dr, Plymouth PL2 3DG'}, + {'Site Name': 'Site2', 'Site Address': '102 Petty France, London SW1H 9AJ'}, + {'Site Name': 'Site3', + 'Site Address': 'Met Office, FitzRoy Road, Exeter, Devon, EX1 3PB'} + ] -class TestPlanOfActionFunction(unittest.TestCase): - - @patch("src.juniper.BuildPayload") - def test_plan_of_action_creates_file(self, mock_payload_processor): - - # Mocking the payload processor - mock_processor_instance = mock_payload_processor.return_value - mock_processor_instance.get_site_payload.return_value = { - "hello": "test"} - - with patch('builtins.input', return_value='Y'), patch('sys.exit') as mock_exit: - plan_of_action(mock_processor_instance) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - expected_file_name = f"../data_src/mist_plan_{timestamp}.json" - self.assertTrue(os.path.exists(expected_file_name)) - os.remove(expected_file_name) - - @patch("src.juniper.BuildPayload") - @patch("builtins.open") - def test_plan_of_action_exit_on_no(self, mock_open_file, mock_payload_processor): - - # Mocking the payload processor - mock_processor_instance = mock_payload_processor.return_value - mock_processor_instance.get_site_payload.return_value = { - "hello": "test"} - - with patch('builtins.input', return_value='N'), self.assertRaises(SystemExit) as cm: - plan_of_action(mock_processor_instance) - self.assertEqual(cm.exception.code, 0) - - @patch("src.juniper.BuildPayload") - @patch("builtins.open") - def test_plan_of_action_invalid_input(self, mock_open_file, mock_payload_processor): - - # Mocking the payload processor - mock_processor_instance = mock_payload_processor.return_value - mock_processor_instance.get_site_payload.return_value = { - "hello": "test"} + expected_data = [ + {'Site Name': 'Site1', 'Site Address': '40 Mayflower Dr, Plymouth PL2 3DG', 'gps': { + 'latitude': 50.3868633, 'longitude': -4.1539256}, 'country_code': 'GB', 'time_zone': 'Europe/London'}, + {'Site Name': 'Site2', 'Site Address': '102 Petty France, London SW1H 9AJ', 'gps': { + 'latitude': 51.499929300000005, 'longitude': -0.13477761285315926}, 'country_code': 'GB', + 'time_zone': 'Europe/London'}, + {'Site Name': 'Site3', 'Site Address': 'Met Office, FitzRoy Road, Exeter, Devon, EX1 3PB', 'gps': { + 'latitude': 50.727350349999995, 'longitude': -3.4744726127760086}, 'country_code': 'GB', + 'time_zone': 'Europe/London'} + ] - # Mocking the built-in input - with patch('builtins.input', return_value='invalid_input'): - with self.assertRaises(ValueError) as cm: - plan_of_action(mock_processor_instance) + actual_data = add_geocoding_to_json(data) - self.assertEqual(str(cm.exception), 'Invalid input') + self.assertEqual(actual_data, expected_data) + find_timezone.assert_called() + mock_find_country_code.assert_called() + mock_geocode.assert_called() diff --git a/test/test_main.py b/test/test_main.py index 5a66fda..fb007ca 100644 --- a/test/test_main.py +++ b/test/test_main.py @@ -1,9 +1,7 @@ +from src.main import convert_csv_to_json import unittest import tempfile import csv -from unittest.mock import patch -from src.main import convert_csv_to_json, add_geocoding_to_json - class TestCsvToJson(unittest.TestCase): @@ -89,45 +87,3 @@ def test_given_csv_while_when_csv_file_contains_nbsp_then_convert_to_normal_spac expected_json = self.csv_data actual_json = convert_csv_to_json(csv_file.name) self.assertEqual(actual_json, expected_json) - - -class TestAddGeocodingToJson(unittest.TestCase): - - @patch('src.main.geocode', side_effect=[ - {'latitude': 50.3868633, 'longitude': -4.1539256}, - {'latitude': 51.499929300000005, 'longitude': -0.13477761285315926}, - {'latitude': 50.727350349999995, 'longitude': -3.4744726127760086}, - ]) - @patch('src.main.find_country_code', return_value='GB') - @patch('src.main.find_timezone', return_value='Europe/London') - def test_given_site_name_and_site_address_in_json_format_when_function_called_then_add_geocode_country_code_and_time_zone( - self, - find_timezone, - mock_find_country_code, - mock_geocode - ): - # Test if the function adds geocoding information correctly - data = [ - {'Site Name': 'Site1', 'Site Address': '40 Mayflower Dr, Plymouth PL2 3DG'}, - {'Site Name': 'Site2', 'Site Address': '102 Petty France, London SW1H 9AJ'}, - {'Site Name': 'Site3', - 'Site Address': 'Met Office, FitzRoy Road, Exeter, Devon, EX1 3PB'} - ] - - expected_data = [ - {'Site Name': 'Site1', 'Site Address': '40 Mayflower Dr, Plymouth PL2 3DG', 'gps': { - 'latitude': 50.3868633, 'longitude': -4.1539256}, 'country_code': 'GB', 'time_zone': 'Europe/London'}, - {'Site Name': 'Site2', 'Site Address': '102 Petty France, London SW1H 9AJ', 'gps': { - 'latitude': 51.499929300000005, 'longitude': -0.13477761285315926}, 'country_code': 'GB', - 'time_zone': 'Europe/London'}, - {'Site Name': 'Site3', 'Site Address': 'Met Office, FitzRoy Road, Exeter, Devon, EX1 3PB', 'gps': { - 'latitude': 50.727350349999995, 'longitude': -3.4744726127760086}, 'country_code': 'GB', - 'time_zone': 'Europe/London'} - ] - - actual_data = add_geocoding_to_json(data) - - self.assertEqual(actual_data, expected_data) - find_timezone.assert_called() - mock_find_country_code.assert_called() - mock_geocode.assert_called() From 2cd9b67f4e51a35bfef2ca9f21a77151a682509d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 29 May 2024 10:03:00 +0000 Subject: [PATCH 2/2] Commit changes made by code formatters --- src/build_juniper_payload.py | 4 +- src/juniper_ap_staging.py | 40 +++++++++------ src/juniper_client.py | 1 - src/juniper_site_creation.py | 1 + src/main.py | 5 +- test/test_build_juniper_payload.py | 5 +- test/test_juniper_ap_staging.py | 81 ++++++++++++++++++++---------- test/test_juniper_client.py | 1 + test/test_juniper_site_creation.py | 65 ++++++++++++------------ test/test_main.py | 1 + 10 files changed, 124 insertions(+), 80 deletions(-) diff --git a/src/build_juniper_payload.py b/src/build_juniper_payload.py index e23309e..3aaa50f 100644 --- a/src/build_juniper_payload.py +++ b/src/build_juniper_payload.py @@ -2,6 +2,7 @@ from datetime import datetime import sys + class BuildPayload: def __init__(self, data: list, @@ -222,6 +223,7 @@ def _build_site_payload( return json_objects + def plan_of_action( processed_payload ): @@ -245,4 +247,4 @@ def plan_of_action( elif user_input == "N": sys.exit(0) else: - raise ValueError('Invalid input') \ No newline at end of file + raise ValueError('Invalid input') diff --git a/src/juniper_ap_staging.py b/src/juniper_ap_staging.py index f199cd3..081e988 100644 --- a/src/juniper_ap_staging.py +++ b/src/juniper_ap_staging.py @@ -14,7 +14,8 @@ def validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs: list['dic for site_name in mist_site_names_csv_file: if site_name not in mist_site_names: - raise ValueError(f"Site name '{site_name}' found in CSV file but not in mist site configurations.") + raise ValueError( + f"Site name '{site_name}' found in CSV file but not in mist site configurations.") def validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory: list['dict'], @@ -31,7 +32,8 @@ def validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory: list['dict'], for site_name in ap_mac_addresses_from_csv: if site_name not in ap_mac_addresses_from_mist: - raise ValueError(f"Site name '{site_name}' found in CSV file but not in mist site configurations.") + raise ValueError( + f"Site name '{site_name}' found in CSV file but not in mist site configurations.") def find_site_by_name(site_name: str, @@ -73,7 +75,7 @@ def format_site_and_mac_reservations(ap_csv: list['dict'] def find_site_id( site_and_mac_reservations_without_mist_site_ids: list['dict'], mist_sites_configs: list['dict'] - ) -> list['dict']: +) -> list['dict']: for site_csv in site_and_mac_reservations_without_mist_site_ids: site_found = False @@ -83,14 +85,15 @@ def find_site_id( site_found = True break if not site_found: - raise ValueError(f"No matching site_id found for site: {site_csv['SiteName']}") + raise ValueError( + f"No matching site_id found for site: {site_csv['SiteName']}") return site_and_mac_reservations_without_mist_site_ids def get_site_payload_for_inventory_assign_to_site( site_and_mac_reservations_including_mist_site_ids: list['dict'] - ) -> list['dict']: +) -> list['dict']: payload = [] for site in site_and_mac_reservations_including_mist_site_ids: @@ -109,6 +112,7 @@ def get_site_payload_for_inventory_assign_to_site( return payload + def rename_ap(admin: object, site_id: str, ap_id: str, @@ -120,16 +124,18 @@ def rename_ap(admin: object, else: print('Successfully renamed ap to new desired name {}'.format(new_ap_name)) + def build_rename_ap_payload( inventory_payloads: list['dict'], mist_inventory: list['dict'], ap_csv: list['dict'] - ) -> list['dict']: +) -> list['dict']: payload = [] for site in inventory_payloads: for mac in site['macs']: - inventory_item = find_inventory_item_by_mac_address(mist_inventory, mac) + inventory_item = find_inventory_item_by_mac_address( + mist_inventory, mac) csv_item = find_csv_item_by_mac_address(ap_csv, mac) payload.append( { @@ -141,23 +147,26 @@ def build_rename_ap_payload( return payload + def find_inventory_item_by_mac_address( mist_inventory: list['dict'], mac: str - ) -> dict: +) -> dict: for item in mist_inventory: if item['mac'] == mac: return item raise ValueError(f"Unable to find item with Mac Address: '{mac}'") + def find_csv_item_by_mac_address( ap_csv: list['dict'], mac: str - ) -> list[dict]: +) -> list[dict]: for item in ap_csv: if item['MAC Address'].replace(":", "") == mac: return item - raise ValueError(f"Unable to find item name with Mac Address: '{mac}' in csv") + raise ValueError( + f"Unable to find item name with Mac Address: '{mac}' in csv") def juniper_ap_assign( @@ -182,18 +191,21 @@ def juniper_ap_assign( mist_sites_configs ) - inventory_payloads = get_site_payload_for_inventory_assign_to_site(site_and_mac_reservations_including_mist_site_ids) + inventory_payloads = get_site_payload_for_inventory_assign_to_site( + site_and_mac_reservations_including_mist_site_ids) plan_of_action(inventory_payloads) for site_payload in inventory_payloads: - result = admin.put('/api/v1/orgs/' + org_id + '/inventory', site_payload) + result = admin.put('/api/v1/orgs/' + org_id + + '/inventory', site_payload) if result == False: print('Failed to assign ap {}'.format(site_payload)) else: print('Successfully assigned ap {}'.format(site_payload)) - rename_ap_payload = build_rename_ap_payload(inventory_payloads,mist_inventory, ap_csv) + rename_ap_payload = build_rename_ap_payload( + inventory_payloads, mist_inventory, ap_csv) plan_of_action(rename_ap_payload) for ap_item in rename_ap_payload: @@ -202,4 +214,4 @@ def juniper_ap_assign( ap_item['site_id'], ap_item['ap_id'], ap_item['new_ap_name'] - ) \ No newline at end of file + ) diff --git a/src/juniper_client.py b/src/juniper_client.py index a3c9507..8a1299a 100644 --- a/src/juniper_client.py +++ b/src/juniper_client.py @@ -107,4 +107,3 @@ def get(self, url): return False return json.loads(response.text) - diff --git a/src/juniper_site_creation.py b/src/juniper_site_creation.py index 7859346..04644bf 100644 --- a/src/juniper_site_creation.py +++ b/src/juniper_site_creation.py @@ -25,6 +25,7 @@ def add_geocoding_to_json(data): time.sleep(1) return data + def warn_if_using_org_id_production(org_id): production_org_ids = [ '3e824dd6-6b37-4cc7-90bb-97d744e91175', diff --git a/src/main.py b/src/main.py index 6781ede..6e1bf22 100644 --- a/src/main.py +++ b/src/main.py @@ -34,7 +34,8 @@ def convert_csv_to_json(file_path): operation = os.environ.get('OPERATION') if operation is None: - raise EnvironmentError("The OPERATION environment variable is not set.") + raise EnvironmentError( + "The OPERATION environment variable is not set.") if operation == "site_creation": juniper_script_site_creation( @@ -56,5 +57,3 @@ def convert_csv_to_json(file_path): ) else: raise ValueError(f"The OPERATION '{operation}' is not recognized.") - - diff --git a/test/test_build_juniper_payload.py b/test/test_build_juniper_payload.py index e8e57cc..27c1a0d 100644 --- a/test/test_build_juniper_payload.py +++ b/test/test_build_juniper_payload.py @@ -256,8 +256,8 @@ def test_append_neither_wifi(self): gov_wifi, moj_wifi, self.site_group_ids) self.assertEqual(result, []) -class TestPlanOfActionFunction(unittest.TestCase): +class TestPlanOfActionFunction(unittest.TestCase): def test_plan_of_action_creates_file(self): @@ -278,7 +278,6 @@ def test_plan_of_action_exit_on_no(self, mock_open_file): plan_of_action(json_payload) self.assertEqual(cm.exception.code, 0) - @patch("builtins.open") def test_plan_of_action_invalid_input(self, mock_open_file): json_payload = {"Hello": "hi"} @@ -288,4 +287,4 @@ def test_plan_of_action_invalid_input(self, mock_open_file): with self.assertRaises(ValueError) as cm: plan_of_action(json_payload) - self.assertEqual(str(cm.exception), 'Invalid input') \ No newline at end of file + self.assertEqual(str(cm.exception), 'Invalid input') diff --git a/test/test_juniper_ap_staging.py b/test/test_juniper_ap_staging.py index 762c94d..dff6183 100644 --- a/test/test_juniper_ap_staging.py +++ b/test/test_juniper_ap_staging.py @@ -13,48 +13,62 @@ def validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs: list['dic for site_name in mist_site_names_csv_file: if site_name not in mist_site_names: - raise ValueError(f"Site name '{site_name}' found in CSV file but not in mist site configurations.") + raise ValueError( + f"Site name '{site_name}' found in CSV file but not in mist site configurations.") class TestValidateIfSiteDefinedOnCSVExistsInMist(unittest.TestCase): def test_all_sites_exist(self): - mist_sites_configs = [{'name': 'SiteA'}, {'name': 'SiteB'}, {'name': 'SiteC'}] - ap_csv = [{'Site Name': 'SiteA'}, {'Site Name': 'SiteB'}, {'Site Name': 'SiteC'}] + mist_sites_configs = [{'name': 'SiteA'}, + {'name': 'SiteB'}, {'name': 'SiteC'}] + ap_csv = [{'Site Name': 'SiteA'}, { + 'Site Name': 'SiteB'}, {'Site Name': 'SiteC'}] try: - validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) + validate_if_site_defined_on_csv_exists_in_mist( + mist_sites_configs, ap_csv) except ValueError: - self.fail("validate_if_site_defined_on_csv_exists_in_mist() raised ValueError unexpectedly!") + self.fail( + "validate_if_site_defined_on_csv_exists_in_mist() raised ValueError unexpectedly!") def test_site_not_exist(self): - mist_sites_configs = [{'name': 'SiteA'}, {'name': 'SiteB'}, {'name': 'SiteC'}] + mist_sites_configs = [{'name': 'SiteA'}, + {'name': 'SiteB'}, {'name': 'SiteC'}] ap_csv = [{'Site Name': 'SiteA'}, {'Site Name': 'SiteD'}] with self.assertRaises(ValueError) as cm: - validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) - self.assertEqual(str(cm.exception), "Site name 'SiteD' found in CSV file but not in mist site configurations.") + validate_if_site_defined_on_csv_exists_in_mist( + mist_sites_configs, ap_csv) + self.assertEqual(str( + cm.exception), "Site name 'SiteD' found in CSV file but not in mist site configurations.") def test_empty_mist_sites_configs(self): mist_sites_configs = [] ap_csv = [{'Site Name': 'SiteA'}, {'Site Name': 'SiteB'}] with self.assertRaises(ValueError) as cm: - validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) - self.assertEqual(str(cm.exception), "Site name 'SiteA' found in CSV file but not in mist site configurations.") + validate_if_site_defined_on_csv_exists_in_mist( + mist_sites_configs, ap_csv) + self.assertEqual(str( + cm.exception), "Site name 'SiteA' found in CSV file but not in mist site configurations.") def test_empty_ap_csv(self): mist_sites_configs = [{'name': 'SiteA'}, {'name': 'SiteB'}] ap_csv = [] try: - validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) + validate_if_site_defined_on_csv_exists_in_mist( + mist_sites_configs, ap_csv) except ValueError: - self.fail("validate_if_site_defined_on_csv_exists_in_mist() raised ValueError unexpectedly!") + self.fail( + "validate_if_site_defined_on_csv_exists_in_mist() raised ValueError unexpectedly!") def test_both_lists_empty(self): mist_sites_configs = [] ap_csv = [] try: - validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv) + validate_if_site_defined_on_csv_exists_in_mist( + mist_sites_configs, ap_csv) except ValueError: - self.fail("validate_if_site_defined_on_csv_exists_in_mist() raised ValueError unexpectedly!") + self.fail( + "validate_if_site_defined_on_csv_exists_in_mist() raised ValueError unexpectedly!") def validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv): @@ -70,42 +84,55 @@ def validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv): for site_name in ap_mac_addresses_from_csv: if site_name not in ap_mac_addresses_from_mist: - raise ValueError(f"Site name '{site_name}' found in CSV file but not in mist site configurations.") + raise ValueError( + f"Site name '{site_name}' found in CSV file but not in mist site configurations.") class TestValidateAP(unittest.TestCase): def test_all_aps_exist(self): - mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}, {'type': 'ap', 'mac': '112233445566'}] - ap_csv = [{'MAC Address': 'aa:bb:cc:dd:ee:ff'}, {'MAC Address': '11:22:33:44:55:66'}] + mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}, { + 'type': 'ap', 'mac': '112233445566'}] + ap_csv = [{'MAC Address': 'aa:bb:cc:dd:ee:ff'}, + {'MAC Address': '11:22:33:44:55:66'}] try: - validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv) + validate_if_ap_defined_on_csv_exists_in_mist( + mist_inventory, ap_csv) except ValueError: - self.fail("validate_if_ap_defined_on_csv_exists_in_mist raised ValueError unexpectedly!") + self.fail( + "validate_if_ap_defined_on_csv_exists_in_mist raised ValueError unexpectedly!") def test_missing_ap_in_mist(self): mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}] - ap_csv = [{'MAC Address': 'aa:bb:cc:dd:ee:ff'}, {'MAC Address': '11:22:33:44:55:66'}] + ap_csv = [{'MAC Address': 'aa:bb:cc:dd:ee:ff'}, + {'MAC Address': '11:22:33:44:55:66'}] with self.assertRaises(ValueError) as context: - validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv) + validate_if_ap_defined_on_csv_exists_in_mist( + mist_inventory, ap_csv) self.assertIn("Site name '112233445566' found in CSV file but not in mist site configurations.", str(context.exception)) def test_empty_csv(self): - mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}, {'type': 'ap', 'mac': '112233445566'}] + mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}, { + 'type': 'ap', 'mac': '112233445566'}] ap_csv = [] try: - validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv) + validate_if_ap_defined_on_csv_exists_in_mist( + mist_inventory, ap_csv) except ValueError: - self.fail("validate_if_ap_defined_on_csv_exists_in_mist raised ValueError unexpectedly!") + self.fail( + "validate_if_ap_defined_on_csv_exists_in_mist raised ValueError unexpectedly!") def test_mixed_inventory(self): - mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}, {'type': 'switch', 'mac': '112233445566'}] + mist_inventory = [{'type': 'ap', 'mac': 'aabbccddeeff'}, { + 'type': 'switch', 'mac': '112233445566'}] ap_csv = [{'MAC Address': 'aa:bb:cc:dd:ee:ff'}] try: - validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv) + validate_if_ap_defined_on_csv_exists_in_mist( + mist_inventory, ap_csv) except ValueError: - self.fail("validate_if_ap_defined_on_csv_exists_in_mist raised ValueError unexpectedly!") + self.fail( + "validate_if_ap_defined_on_csv_exists_in_mist raised ValueError unexpectedly!") if __name__ == '__main__': diff --git a/test/test_juniper_client.py b/test/test_juniper_client.py index 5c0a716..36b696f 100644 --- a/test/test_juniper_client.py +++ b/test/test_juniper_client.py @@ -2,6 +2,7 @@ from unittest.mock import patch, MagicMock from src.juniper_client import JuniperClient + class TestJuniperClient(unittest.TestCase): # Mocking the input function to provide a static MFA code diff --git a/test/test_juniper_site_creation.py b/test/test_juniper_site_creation.py index 7bddb34..51eec36 100644 --- a/test/test_juniper_site_creation.py +++ b/test/test_juniper_site_creation.py @@ -6,6 +6,7 @@ import pytest from parameterized import parameterized + class TestJuniperScript(unittest.TestCase): @patch('src.juniper_site_creation.plan_of_action') @@ -38,16 +39,17 @@ def test_given_successful_login_when_juniper_script_runs_post_a_site(self, mock_ ) mock_post.assert_called_once_with('/api/v1/orgs/your_org_id/sites', - {'name': 'TestSite', 'address': 'Met Office, FitzRoy Road, Exeter, Devon, EX1 3PB', - 'latlng': {'lat': 50.727350349999995, 'lng': -3.4744726127760086}, 'country_code': 'GB', - 'rftemplate_id': '8542a5fa-51e4-41be-83b9-acb416362cc0', - 'networktemplate_id': '46b87163-abd2-4b08-a67f-1ccecfcfd061', 'timezone': 'Europe/London', - 'sitegroup_ids': []} - ) + {'name': 'TestSite', 'address': 'Met Office, FitzRoy Road, Exeter, Devon, EX1 3PB', + 'latlng': {'lat': 50.727350349999995, 'lng': -3.4744726127760086}, 'country_code': 'GB', + 'rftemplate_id': '8542a5fa-51e4-41be-83b9-acb416362cc0', + 'networktemplate_id': '46b87163-abd2-4b08-a67f-1ccecfcfd061', 'timezone': 'Europe/London', + 'sitegroup_ids': []} + ) def test_juniper_script_missing_site_group_ids(self): with self.assertRaises(ValueError) as cm: - juniper_script_site_creation([], org_id='your_org_id', mist_login_method='token') + juniper_script_site_creation( + [], org_id='your_org_id', mist_login_method='token') self.assertEqual(str(cm.exception), 'Must provide site_group_ids for GovWifi & MoJWifi') @@ -56,13 +58,13 @@ def test_juniper_script_missing_rf_template_id(self): # Test when rf_template_id is missing with self.assertRaises(ValueError) as cm: juniper_script_site_creation([], - org_id='your_org_id', - mist_login_method='token', - site_group_ids={ - 'moj_wifi': '0b33c61d-8f51-4757-a14d-29263421a904', - 'gov_wifi': '70f3e8af-85c3-484d-8d90-93e28b911efb'}, - network_template_id='46b87163-abd2-4b08-a67f-1ccecfcfd061' - ) + org_id='your_org_id', + mist_login_method='token', + site_group_ids={ + 'moj_wifi': '0b33c61d-8f51-4757-a14d-29263421a904', + 'gov_wifi': '70f3e8af-85c3-484d-8d90-93e28b911efb'}, + network_template_id='46b87163-abd2-4b08-a67f-1ccecfcfd061' + ) self.assertEqual(str(cm.exception), 'Must define rf_template_id') @@ -106,13 +108,13 @@ def test_juniper_script_missing_network_template_id(self): # Test when network_template_id is missing with self.assertRaises(ValueError) as cm: juniper_script_site_creation([], - org_id='your_org_id', - mist_login_method='token', - site_group_ids={ - 'moj_wifi': '0b33c61d-8f51-4757-a14d-29263421a904', - 'gov_wifi': '70f3e8af-85c3-484d-8d90-93e28b911efb'}, - rf_template_id='46b87163-abd2-4b08-a67f-1ccecfcfd061' - ) + org_id='your_org_id', + mist_login_method='token', + site_group_ids={ + 'moj_wifi': '0b33c61d-8f51-4757-a14d-29263421a904', + 'gov_wifi': '70f3e8af-85c3-484d-8d90-93e28b911efb'}, + rf_template_id='46b87163-abd2-4b08-a67f-1ccecfcfd061' + ) self.assertEqual(str(cm.exception), 'Must define network_template_id') @@ -122,16 +124,16 @@ def test_given_mist_login_method_not_defined_then_default_to_credentials(self, m output_buffer = StringIO() with patch('sys.stdout', new_callable=StringIO) as mock_stdout: juniper_script_site_creation([], - org_id='your_org_id', - site_group_ids={ - 'moj_wifi': '0b33c61d-8f51-4757-a14d-29263421a904', - 'gov_wifi': '70f3e8af-85c3-484d-8d90-93e28b911efb'}, - rf_template_id='46b87163-abd2-4b08-a67f-1ccecfcfd061', - network_template_id='46b87163-abd2-4b08-a67f-1ccecfcfd061', - mist_login_method=None, - ap_versions={"AP45": "0.12.27139", - "AP32": "0.12.27139"} - ) + org_id='your_org_id', + site_group_ids={ + 'moj_wifi': '0b33c61d-8f51-4757-a14d-29263421a904', + 'gov_wifi': '70f3e8af-85c3-484d-8d90-93e28b911efb'}, + rf_template_id='46b87163-abd2-4b08-a67f-1ccecfcfd061', + network_template_id='46b87163-abd2-4b08-a67f-1ccecfcfd061', + mist_login_method=None, + ap_versions={"AP45": "0.12.27139", + "AP32": "0.12.27139"} + ) actual_output = mock_stdout.getvalue() expected_message = "mist_login_method not defined. Defaulting to credentials" self.assertIn(expected_message, actual_output, @@ -144,6 +146,7 @@ def test_juniper_script_missing_org_id(self): self.assertEqual(str(cm.exception), 'Please provide Mist org_id') + class TestAddGeocodingToJson(unittest.TestCase): @patch('src.juniper_site_creation.geocode', side_effect=[ diff --git a/test/test_main.py b/test/test_main.py index fb007ca..9a63390 100644 --- a/test/test_main.py +++ b/test/test_main.py @@ -3,6 +3,7 @@ import tempfile import csv + class TestCsvToJson(unittest.TestCase): def setUp(self):