-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconvert_topology.py
298 lines (253 loc) · 11 KB
/
convert_topology.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
"""
Main class of kytos/sdx Kytos Network Application.
SDX API
"""
import os
import re
from .settings import OVERRIDE_VLAN_RANGE
class ParseConvertTopology:
"""Parse Topology class of kytos/sdx NApp."""
def __init__(self, **args):
self.kytos_topology = args["topology"]
self.version = args["version"]
self.timestamp = args["timestamp"]
self.oxp_name = args["oxp_name"]
self.oxp_url = args["oxp_url"]
self.model_version = "2.0.0"
# override interface vlan range for sdx when no sdx_vlan_range
# metadata is available
self.override_vlan_range = os.environ.get(
"OVERRIDE_VLAN_RANGE", OVERRIDE_VLAN_RANGE
)
# mapping from Kytos to SDX and vice-versa
self.kytos2sdx = {}
self.sdx2kytos = {}
def get_kytos_nodes(self) -> dict:
"""return parse_args["topology"]["switches"] values"""
return self.kytos_topology["switches"].values()
def get_kytos_links(self) -> dict:
"""return parse_args["topology"]["links"] values"""
return self.kytos_topology["links"].values()
@staticmethod
def get_link_port_speed(speed: str) -> int:
"""Function to obtain the speed of a specific port in the link."""
type_to_speed = {
"400GE": 400,
"100GE": 100,
"50GE": 50,
"40GE": 40,
"25GE": 25,
"10GE": 10,
"50000000000": 400,
"50000000000.0": 400,
"12500000000": 100,
"12500000000.0": 100,
"6250000000": 50,
"6250000000.0": 50,
"5000000000": 40,
"5000000000.0": 40,
"3125000000": 25,
"3125000000.0": 25,
"1250000000": 10,
"1250000000.0": 10,
}
return type_to_speed.get(speed, 0)
@staticmethod
def get_type_port_speed(speed: str) -> str:
"""Function to obtain the speed of a specific port type."""
speed_to_type = {
"400GE": "400GE",
"50000000000": "400GE",
"50000000000.0": "400GE",
"100GE": "100GE",
"12500000000": "100GE",
"12500000000.0": "100GE",
"50GE": "50GE",
"6250000000": "50GE",
"6250000000.0": "50GE",
"40GE": "40GE",
"5000000000": "40GE",
"5000000000.0": "40GE",
"25GE": "25GE",
"3125000000": "25GE",
"3125000000.0": "25GE",
"10GE": "10GE",
"1250000000": "10GE",
"1250000000.0": "10GE",
}
return speed_to_type.get(speed, "Other")
@staticmethod
def get_status(status: bool) -> str:
"""Function to obtain the status."""
return "up" if status else "down"
@staticmethod
def get_state(state: bool) -> str:
"""Function to obtain the state."""
return "enabled" if state else "disabled"
def get_kytos_link_label(self, kytos_link: dict) -> str:
"""Return the kytos link label"""
if "endpoint_a" not in kytos_link or "endpoint_b" not in kytos_link:
raise ValueError(f"Invalid Kytos link: {kytos_link}")
link_name = kytos_link["metadata"].get("link_name")
if link_name:
link_name = re.sub(r"\s+", "_", link_name)
link_name = re.sub("[^A-Za-z0-9_.,/-]", "", link_name)
return link_name[:30]
interface_a = int(kytos_link["endpoint_a"]["id"][24:])
switch_a = kytos_link["endpoint_a"]["id"][:23]
interface_b = int(kytos_link["endpoint_b"]["id"][24:])
switch_b = kytos_link["endpoint_b"]["id"][:23]
node_swa = self.get_kytos_node_name(switch_a)
node_swb = self.get_kytos_node_name(switch_b)
if node_swb == node_swa:
if interface_b < interface_a:
interface_a, interface_b = interface_b, interface_a
elif node_swb < node_swa:
node_swa, node_swb = node_swb, node_swa
interface_a, interface_b = interface_b, interface_a
return f"{node_swa}/{interface_a}_{node_swb}/{interface_b}"
def get_port_urn(self, interface: dict) -> str:
"""function to generate the full urn address for a node"""
switch_name = self.get_kytos_node_name(interface["switch"])
port_no = interface["port_number"]
return f"urn:sdx:port:{self.oxp_url}:{switch_name}:{port_no}"
def get_port(self, sdx_node_name: str, interface: dict) -> dict:
"""Function to retrieve a network device's port (or interface)"""
sdx_port = {}
sdx_port["id"] = self.get_port_urn(interface)
if interface["metadata"].get("port_name"):
port_name = interface["metadata"]["port_name"]
port_name = re.sub(r"\s+", "_", port_name)
port_name = re.sub("[^A-Za-z0-9_.,/-]", "", port_name)
sdx_port["name"] = port_name[:30]
else:
sdx_port["name"] = interface["name"][:30]
sdx_port["node"] = f"urn:sdx:node:{self.oxp_url}:{sdx_node_name}"
sdx_port["type"] = self.get_type_port_speed(str(interface["speed"]))
sdx_port["status"] = self.get_status(interface["active"])
sdx_port["state"] = self.get_state(interface["enabled"])
sdx_port["mtu"] = interface["metadata"].get("mtu", 1500)
if interface["nni"]:
link_label = self.get_kytos_link_label(
self.kytos_topology["links"].get(interface["link"])
)
sdx_port["nni"] = f"urn:sdx:link:{self.oxp_url}:{link_label}"
elif "sdx_nni" in interface["metadata"]:
sdx_port["nni"] = "urn:sdx:port:" + interface["metadata"]["sdx_nni"]
else:
sdx_port["nni"] = ""
vlan_range = interface["metadata"].get("sdx_vlan_range")
if vlan_range is None:
vlan_range = self.override_vlan_range
if vlan_range is None:
vlan_range = interface.get("tag_ranges", [[1, 4095]])
sdx_port["services"] = {
# "l2vpn-ptmp":{"vlan_range": vlan_range}
}
if vlan_range:
sdx_port["services"]["l2vpn-ptp"] = {"vlan_range": vlan_range}
sdx_port["private"] = ["status"]
return sdx_port
def get_ports(self, sdx_node_name: str, interfaces: dict) -> list:
"""Function that calls the main individual get_port function,
to get a full list of ports from a node/ interface"""
ports = []
for interface in interfaces.values():
port_no = interface["port_number"]
if port_no != 4294967294:
ports.append(self.get_port(sdx_node_name, interface))
self.kytos2sdx[interface["id"]] = ports[-1]["id"]
self.sdx2kytos[ports[-1]["id"]] = interface["id"]
return ports
def get_kytos_node_name(self, switch_id: str) -> str:
"""retrieve the data_path attribute for every Kytos topology switch"""
switch = self.kytos_topology["switches"].get(switch_id)
if not switch:
raise ValueError(f"Switch {switch_id} not found on the topology")
if "node_name" in switch["metadata"]:
return switch["metadata"]["node_name"][:30]
if len(switch["data_path"]) <= 30:
return switch["data_path"]
return switch["dpid"].replace(":", "-")
def get_sdx_node(self, kytos_node: dict) -> dict:
"""function that builds every Node dictionary object with all the
necessary attributes that make a Node object; the name, id, location
and list of ports."""
sdx_node = {}
if "node_name" in kytos_node["metadata"]:
sdx_node["name"] = kytos_node["metadata"]["node_name"]
else:
sdx_node["name"] = kytos_node["data_path"]
sdx_node["id"] = f"urn:sdx:node:{self.oxp_url}:{sdx_node['name']}"
sdx_node["location"] = {
"address": kytos_node["metadata"].get("address", ""),
"latitude": float(kytos_node["metadata"].get("lat", 0)),
"longitude": float(kytos_node["metadata"].get("lng", 0)),
"iso3166_2_lvl4": kytos_node["metadata"].get("iso3166_2_lvl4", ""),
"private": [],
}
sdx_node["ports"] = self.get_ports(sdx_node["name"], kytos_node["interfaces"])
sdx_node["status"] = self.get_status(kytos_node["active"])
sdx_node["state"] = self.get_state(kytos_node["enabled"])
return sdx_node
def get_sdx_nodes(self) -> list:
"""returns SDX Nodes list with every enabled Kytos node in topology"""
sdx_nodes = []
for kytos_node in self.get_kytos_nodes():
if kytos_node["enabled"]:
sdx_nodes.append(self.get_sdx_node(kytos_node))
return sdx_nodes
def get_sdx_link(self, kytos_link):
"""generates a dictionary object for every link in a network,
and containing all the attributes for each link"""
sdx_link = {}
link_md = kytos_link["metadata"]
sdx_link["name"] = self.get_kytos_link_label(kytos_link)
sdx_link["id"] = f"urn:sdx:link:{self.oxp_url}:{sdx_link['name']}"
sdx_link["ports"] = sorted(
[
self.get_port_urn(kytos_link["endpoint_a"]),
self.get_port_urn(kytos_link["endpoint_b"]),
]
)
sdx_link["type"] = "intra"
sdx_link["bandwidth"] = self.get_link_port_speed(
str(
min(
kytos_link["endpoint_a"]["speed"],
kytos_link["endpoint_b"]["speed"],
)
)
)
sdx_link["residual_bandwidth"] = link_md.get("residual_bandwidth", 100)
sdx_link["latency"] = link_md.get("latency", 0)
sdx_link["packet_loss"] = link_md.get("packet_loss", 0)
sdx_link["availability"] = link_md.get("availability", 0)
sdx_link["status"] = self.get_status(kytos_link["active"])
sdx_link["state"] = self.get_state(kytos_link["enabled"])
sdx_link["private"] = ["packet_loss"]
return sdx_link
def get_sdx_links(self):
"""function that returns a list of Link objects based on the network's
devices connections to each other"""
sdx_links = []
for kytos_link in self.get_kytos_links():
if kytos_link["enabled"]:
sdx_link = self.get_sdx_link(kytos_link)
if sdx_link:
sdx_links.append(sdx_link)
return sdx_links
def parse_convert_topology(self):
"""function get_sdx_topology"""
topology = {}
topology["name"] = self.oxp_name
topology["id"] = f"urn:sdx:topology:{self.oxp_url}"
topology["version"] = self.version
topology["timestamp"] = self.timestamp
topology["model_version"] = self.model_version
topology["nodes"] = self.get_sdx_nodes()
topology["links"] = self.get_sdx_links()
topology["services"] = ["l2vpn-ptp"]
topology["kytos2sdx"] = self.kytos2sdx
topology["sdx2kytos"] = self.sdx2kytos
return topology