Skip to content

Commit

Permalink
fixes functions and adds cli support
Browse files Browse the repository at this point in the history
- implements cli
- fixes get_bgp_neighbors napalm-automation-community#44
- fixes get_lldp_neighbors(multiple neighbors)
  • Loading branch information
marcosvbuzo committed Aug 12, 2021
1 parent a5d3b94 commit 72ed825
Showing 1 changed file with 117 additions and 89 deletions.
206 changes: 117 additions & 89 deletions napalm_vyos/vyos.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,86 +472,99 @@ def get_bgp_neighbors(self):
"""

output = self.device.send_command("show ip bgp summary")
output = output.split("\n")

match = re.search(r".* router identifier (\d+\.\d+\.\d+\.\d+), local AS number (\d+)",
output[0])
match = re.search(
r"bgp router identifier (\d+\.\d+\.\d+\.\d+), local as number (\d+) vrf-id (\d+)",
output,
re.MULTILINE | re.IGNORECASE
)
if not match:
return {}

router_id = match.group(1)
local_as = int(match.group(2))

bgp_neighbor_data = dict()
bgp_neighbor_data["global"] = dict()
bgp_neighbor_data["global"]["router_id"] = router_id
bgp_neighbor_data["global"]["peers"] = {}

re_neighbor = re.compile(
'^([\w\d:\.]+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\S+)\s+(\w+)'
)
for bgp_line in output.split("\n"):
match = re_neighbor.search(bgp_line)
if not match:
continue

peer_id = match.group(1)
bgp_version = match.group(2)
remote_as = match.group(3)
msg_rcvd = match.group(4)
msg_sent = match.group(5)
table_version = match.group(6)
in_queue = match.group(7)
out_queue = match.group(8)
up_time = match.group(9)
state_prefix = match.group(10)

is_enabled = "(Admin)" not in state_prefix

received_prefixes = None

try:
state_prefix = int(state_prefix)
received_prefixes = int(state_prefix)
is_up = True
except ValueError:
state_prefix = -1
received_prefixes = -1
is_up = False

if bgp_version == "4":
address_family = "ipv4"
elif bgp_version == "6":
address_family = "ipv6"
else:
raise ValueError("BGP neighbor parsing failed")

"""
'show ip bgp neighbors 192.168.1.1' output example:
BGP neighbor is 192.168.1.1, remote AS 64519, local AS 64520, external link
BGP version 4, remote router ID 192.168.1.1
For address family: IPv4 Unicast
~~~
Community attribute sent to this neighbor(both)
1 accepted prefixes
~~~
"""
bgp_detail = self.device.send_command("show ip bgp neighbors %s" % peer_id)

match_rid = re.search(r"remote router ID (\d+\.\d+\.\d+\.\d+).*", bgp_detail)
remote_rid = match_rid.group(1)

match_prefix_accepted = re.search(r"(\d+) accepted prefixes", bgp_detail)
accepted_prefixes = match_prefix_accepted.group(1)

bgp_neighbor_data["global"]["peers"].setdefault(peer_id, {})
peer_dict = {
"description": "",
"is_enabled": bool(is_enabled),
"local_as": int(local_as),
"is_up": bool(is_up),
"remote_id": remote_rid,
"uptime": int(self._bgp_time_conversion(up_time)),
"remote_as": int(remote_as)
}

# delete the header and empty element
bgp_info = [i.strip() for i in output[6:-2] if i]

for i in bgp_info:
if len(i) > 0:
peer_id, bgp_version, remote_as, msg_rcvd, msg_sent, table_version, \
in_queue, out_queue, up_time, state_prefix = i.split()

is_enabled = "(Admin)" not in state_prefix

received_prefixes = None

try:
state_prefix = int(state_prefix)
received_prefixes = int(state_prefix)
is_up = True
except ValueError:
state_prefix = -1
received_prefixes = -1
is_up = False

if bgp_version == "4":
address_family = "ipv4"
elif bgp_version == "6":
address_family = "ipv6"
else:
raise ValueError("BGP neighbor parsing failed")

"""
'show ip bgp neighbors 192.168.1.1' output example:
BGP neighbor is 192.168.1.1, remote AS 64519, local AS 64520, external link
BGP version 4, remote router ID 192.168.1.1
For address family: IPv4 Unicast
~~~
Community attribute sent to this neighbor(both)
1 accepted prefixes
~~~
"""
bgp_detail = self.device.send_command("show ip bgp neighbors %s" % peer_id)

match_rid = re.search(r"remote router ID (\d+\.\d+\.\d+\.\d+).*", bgp_detail)
remote_rid = match_rid.group(1)

match_prefix_accepted = re.search(r"(\d+) accepted prefixes", bgp_detail)
accepted_prefixes = match_prefix_accepted.group(1)

bgp_neighbor_data["global"]["peers"].setdefault(peer_id, {})
peer_dict = {
"description": "",
"is_enabled": bool(is_enabled),
"local_as": int(local_as),
"is_up": bool(is_up),
"remote_id": remote_rid,
"uptime": int(self._bgp_time_conversion(up_time)),
"remote_as": int(remote_as)
}

af_dict = dict()
af_dict[address_family] = {
"sent_prefixes": int(-1),
"accepted_prefixes": int(accepted_prefixes),
"received_prefixes": int(received_prefixes)
}
af_dict = dict()
af_dict[address_family] = {
"sent_prefixes": int(-1),
"accepted_prefixes": int(accepted_prefixes),
"received_prefixes": int(received_prefixes)
}

peer_dict["address_family"] = af_dict
bgp_neighbor_data["global"]["peers"][peer_id] = peer_dict
peer_dict["address_family"] = af_dict
bgp_neighbor_data["global"]["peers"][peer_id] = peer_dict

return bgp_neighbor_data

Expand Down Expand Up @@ -586,27 +599,28 @@ def _bgp_time_conversion(self, bgp_uptime):
return uptime

def get_lldp_neighbors(self):
# Multiple neighbors per port are not implemented
# The show lldp neighbors commands lists port descriptions, not IDs
ret = []
output = self.device.send_command("show lldp neighbors detail")
pattern = r'''(?s)Interface: +(?P<interface>\S+), [^\n]+
.+?
+SysName: +(?P<hostname>\S+)
.+?
+PortID: +ifname (?P<port>\S+)'''

def _get_interface(match):
return [
{
'hostname': match.group('hostname'),
'port': match.group('port'),
}
]

return {
match.group('interface'): _get_interface(match)
for match in re.finditer(pattern, output)
}
regex = '{},.+?{}.+?{}'.format(
"Interface:\s+(\S+)",
"SysName:\s+(\S+)",
"PortID:\s+ifname (\S+)"
)
re_lldp = re.compile(
regex,
re.MULTILINE|re.DOTALL
)
match = re_lldp.findall(output)
if not match:
return []

for ocurrence in match:
ret.append({
'hostname' : ocurrence[0],
'port' : ocurrence[1],
'interface' : ocurrence[2]
})
return ret

def get_interfaces_counters(self):
# 'rx_unicast_packet', 'rx_broadcast_packets', 'tx_unicast_packets',
Expand Down Expand Up @@ -958,3 +972,17 @@ def _get_running_config(self, sanitized):
config = config[:config.rfind('\n')]
self.device.exit_config_mode()
return config

def cli(self, commands):
cli_output = {}

if not isinstance(commands, list):
raise TypeError("Please enter a valid list of commands!")

for command in commands:
try:
cli_output[command] = self.device.send_command(command)
except Exception:
cli_output[command] = 'error running command'

return cli_output

0 comments on commit 72ed825

Please sign in to comment.