Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ability to assign APs #27

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ SITE_GROUP_IDS={"moj_wifi": "foo","gov_wifi": "bar"}
RF_TEMPLATE_ID=
NETWORK_TEMPLATE_ID=
AP_VERSIONS={"AP45": "0.12.27139", "AP32": "0.12.27139"}
OPERATION=site_creation OR ap_staging

; Optional config options
;MIST_USERNAME=
Expand Down
2 changes: 2 additions & 0 deletions example_ap_list.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
SITE FITS ID,Site Name,Location,Device Type,Make/Model,Host Name,Serial Number,MAC Address, Claim Code, RF Report Reference
FITS_0982,HMP Norwich,"102PF Probation Test Site,not on floorplan",Access Point,AP45,MOJ-NS-1234-WAP001,A164622020622,a8:f7:d9:82:0d:23,A123456789,Blah Blah Blah
28 changes: 28 additions & 0 deletions src/build_juniper_payload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
from datetime import datetime
import sys


class BuildPayload:
Expand Down Expand Up @@ -220,3 +222,29 @@ def _build_site_payload(
json_objects.append(site_dict)

return json_objects


def plan_of_action(
processed_payload
):
# Generate a timestamp for the filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
plan_file_name = "../data_src/mist_plan_{time}.json".format(time=timestamp)

# Load file
with open(plan_file_name, "w") as plan_file:
json.dump(processed_payload, plan_file, indent=2)

# Print to terminal
with open(plan_file_name, "r") as plan_file:
print(plan_file.read())

print("A file containing all the changes has been created: {file}".format(
file=plan_file_name))
user_input = input("Do you wish to continue? (y/n): ").upper()
if user_input == "Y":
print("Continuing with run")
elif user_input == "N":
sys.exit(0)
else:
raise ValueError('Invalid input')
217 changes: 217 additions & 0 deletions src/juniper_ap_staging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
from juniper_client import JuniperClient
from build_juniper_payload import plan_of_action


def validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs: list['dict'],
ap_csv: list['dict']):
mist_site_names = []
for mist_site_config in mist_sites_configs:
mist_site_names.append(mist_site_config['name'])

mist_site_names_csv_file = []
for ap_row in ap_csv:
mist_site_names_csv_file.append(ap_row['Site Name'])

for site_name in mist_site_names_csv_file:
if site_name not in mist_site_names:
raise ValueError(
f"Site name '{site_name}' found in CSV file but not in mist site configurations.")


def validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory: list['dict'],
ap_csv: list['dict']):
ap_mac_addresses_from_mist = []
for inventory_item in mist_inventory:
if inventory_item['type'] == 'ap':
ap_mac_addresses_from_mist.append(inventory_item['mac'])

ap_mac_addresses_from_csv = []
for ap_row in ap_csv:
cleaned_mac_address = ap_row['MAC Address'].replace(":", "")
ap_mac_addresses_from_csv.append(cleaned_mac_address)

for site_name in ap_mac_addresses_from_csv:
if site_name not in ap_mac_addresses_from_mist:
raise ValueError(
f"Site name '{site_name}' found in CSV file but not in mist site configurations.")


def find_site_by_name(site_name: str,
site_and_mac_reservations: list):
for site in site_and_mac_reservations:
if site["SiteName"] == site_name:
return site
return None


def format_site_and_mac_reservations(ap_csv: list['dict']
) -> list['dict']:
site_and_mac_reservations = []

for ap_row in ap_csv:
site_name = ap_row.get("Site Name", "")
mac_address = ap_row.get("MAC Address", "").replace(":", "")

# Find the existing site in the list
existing_site = find_site_by_name(site_name, site_and_mac_reservations)

if existing_site:
# Update the mac_addresses field by appending the new MAC address
if mac_address not in existing_site["mac_addresses"]:
existing_site["mac_addresses"] += f",{mac_address}"
else:
# Create a new dictionary with the desired keys and values
new_item = {
""
"SiteName": site_name,
"mac_addresses": mac_address
}
# Append the new dictionary to the output list
site_and_mac_reservations.append(new_item)

return site_and_mac_reservations


def find_site_id(
site_and_mac_reservations_without_mist_site_ids: list['dict'],
mist_sites_configs: list['dict']
) -> list['dict']:

for site_csv in site_and_mac_reservations_without_mist_site_ids:
site_found = False
for site_mist in mist_sites_configs:
if site_csv['SiteName'] == site_mist['name']:
site_csv['site_id'] = site_mist['id']
site_found = True
break
if not site_found:
raise ValueError(
f"No matching site_id found for site: {site_csv['SiteName']}")

return site_and_mac_reservations_without_mist_site_ids


def get_site_payload_for_inventory_assign_to_site(
site_and_mac_reservations_including_mist_site_ids: list['dict']
) -> list['dict']:
payload = []

for site in site_and_mac_reservations_including_mist_site_ids:
payload.append(
{
"op": "assign",
"site_id": site['site_id'],
"macs": [
site['mac_addresses']
],
"no_reassign": False,
"disable_auto_config": False,
"managed": False
}
)

return payload


def rename_ap(admin: object,
site_id: str,
ap_id: str,
new_ap_name: str):
api_url = '/api/v1/sites/' + site_id + '/devices/' + ap_id
response = admin.put(api_url, payload={'name': new_ap_name})
if response == False:
print('Failed to rename ap to new desired name {}'.format(new_ap_name))
else:
print('Successfully renamed ap to new desired name {}'.format(new_ap_name))


def build_rename_ap_payload(
inventory_payloads: list['dict'],
mist_inventory: list['dict'],
ap_csv: list['dict']
) -> list['dict']:
payload = []

for site in inventory_payloads:
for mac in site['macs']:
inventory_item = find_inventory_item_by_mac_address(
mist_inventory, mac)
csv_item = find_csv_item_by_mac_address(ap_csv, mac)
payload.append(
{
"new_ap_name": csv_item['Host Name'],
"site_id": site['site_id'],
"ap_id": inventory_item['id'],
}
)

return payload


def find_inventory_item_by_mac_address(
mist_inventory: list['dict'],
mac: str
) -> dict:
for item in mist_inventory:
if item['mac'] == mac:
return item
raise ValueError(f"Unable to find item with Mac Address: '{mac}'")


def find_csv_item_by_mac_address(
ap_csv: list['dict'],
mac: str
) -> list[dict]:
for item in ap_csv:
if item['MAC Address'].replace(":", "") == mac:
return item
raise ValueError(
f"Unable to find item name with Mac Address: '{mac}' in csv")


def juniper_ap_assign(
ap_csv: list['dict'],
org_id=None,
mist_username=None,
mist_login_method=None
):
admin = JuniperClient(mist_username, mist_login_method)
mist_sites_configs = admin.get('/api/v1/orgs/' + org_id + '/sites')
mist_inventory = admin.get('/api/v1/orgs/' + org_id + '/inventory')

validate_if_site_defined_on_csv_exists_in_mist(mist_sites_configs, ap_csv)
validate_if_ap_defined_on_csv_exists_in_mist(mist_inventory, ap_csv)

site_and_mac_reservations_without_mist_site_ids = format_site_and_mac_reservations(
ap_csv
)

site_and_mac_reservations_including_mist_site_ids = find_site_id(
site_and_mac_reservations_without_mist_site_ids,
mist_sites_configs
)

inventory_payloads = get_site_payload_for_inventory_assign_to_site(
site_and_mac_reservations_including_mist_site_ids)

plan_of_action(inventory_payloads)

for site_payload in inventory_payloads:
result = admin.put('/api/v1/orgs/' + org_id +
'/inventory', site_payload)
if result == False:
print('Failed to assign ap {}'.format(site_payload))
else:
print('Successfully assigned ap {}'.format(site_payload))

rename_ap_payload = build_rename_ap_payload(
inventory_payloads, mist_inventory, ap_csv)
plan_of_action(rename_ap_payload)

for ap_item in rename_ap_payload:
rename_ap(
admin,
ap_item['site_id'],
ap_item['ap_id'],
ap_item['new_ap_name']
)
17 changes: 17 additions & 0 deletions src/juniper_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,20 @@ def put(self, url, payload):
return False

return json.loads(response.text)

def get(self, url):
url = 'https://api.eu.mist.com{}'.format(url)
session = self.session
headers = self.headers

print('GET {}'.format(url))
response = session.get(url, headers=headers)

if response.status_code != 200:
print('Failed to GET')
print('\tURL: {}'.format(url))
print('\tResponse: {} ({})'.format(
response.text, response.status_code))
return False

return json.loads(response.text)
65 changes: 30 additions & 35 deletions src/juniper.py → src/juniper_site_creation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,29 @@
import json
import sys
from datetime import datetime
from build_juniper_payload import BuildPayload
from build_juniper_payload import BuildPayload, plan_of_action
from juniper_client import JuniperClient
from geocode import geocode, find_timezone, find_country_code
import time


def add_geocoding_to_json(data):
for d in data:
# Variables
site_id = None
site = {'name': d.get('Site Name', ''),
'address': d.get('Site Name', '')
}

gps = geocode(d.get('Site Address', ''))
country_code = find_country_code(gps)
time_zone = find_timezone(gps)

# Adding new key-value pairs to the dictionary
d['gps'] = gps
d['country_code'] = country_code
d['time_zone'] = time_zone
time.sleep(1)
return data


def warn_if_using_org_id_production(org_id):
Expand All @@ -23,36 +44,6 @@ def warn_if_using_org_id_production(org_id):
raise ValueError('Invalid input')


def plan_of_action(
payload_processor
):

# Generate a timestamp for the filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
plan_file_name = "../data_src/mist_plan_{time}.json".format(time=timestamp)

processed_payload = payload_processor.get_site_payload()

# Load file
with open(plan_file_name, "w") as plan_file:
json.dump(processed_payload, plan_file, indent=2)

# Print to terminal
with open(plan_file_name, "r") as plan_file:
print(plan_file.read())

print("A file containing all the changes has been created: {file}".format(
file=plan_file_name))
user_input = input("Do you wish to continue? (y/n): ").upper()

if user_input == "Y":
print("Continuing with run")
elif user_input == "N":
sys.exit(0)
else:
raise ValueError('Invalid input')


def validate_user_defined_config_variables(
org_id,
site_group_ids,
Expand All @@ -79,8 +70,8 @@ def validate_user_defined_config_variables(
# Main function


def juniper_script(
data,
def juniper_script_site_creation(
json_data_without_geocoding,
org_id=None,
mist_username=None,
mist_login_method=None,
Expand All @@ -89,6 +80,10 @@ def juniper_script(
network_template_id=None,
ap_versions=None
):

data = add_geocoding_to_json(
json_data_without_geocoding)

# Configure True/False to enable/disable additional logging of the API response objects
show_more_details = True

Expand All @@ -112,7 +107,7 @@ def juniper_script(
)

plan_of_action(
payload_processor
payload_processor.get_site_payload()
)

# Establish Mist session
Expand Down
Loading