diff --git a/digi/xbee/devices.py b/digi/xbee/devices.py index ddcdec7..3258d5b 100755 --- a/digi/xbee/devices.py +++ b/digi/xbee/devices.py @@ -8864,7 +8864,7 @@ def get_number_devices(self): """ return len(self.__devices_list) - def export(self, dir_path=None, name=None, desc=None): + def export(self, dir_path=None, name=None, desc=None, method="xml"): """ Exports this network to the given file path. @@ -8876,6 +8876,7 @@ def export(self, dir_path=None, name=None, desc=None): name. If not defined home directory is used. name (String, optional, default=`None`): Network human readable name. desc (String, optional, default=`None`): Network description. + method (String, optional, default='xml'): Type of structure to export to. Supports (xml, json) Returns: Tuple (Integer, String): Tuple with result (0: success, 1: failure) @@ -8884,13 +8885,14 @@ def export(self, dir_path=None, name=None, desc=None): import datetime from pathlib import Path + method = method.strip().lower() date_now = datetime.datetime.now() if not dir_path: dir_path = str(Path.home()) if not name: name = "%s network" % str(self._local_xbee) - file_name = "%s_%s.xnet" % (name.strip().replace(" ", "_"), - date_now.strftime("%m%d%y_%H%M%S")) + file_name = "%s_%s.xnet" % (name.strip().replace( + " ", "_"), date_now.strftime("%m%d%y_%H%M%S")) file = Path(dir_path, file_name) try: if file.exists(): @@ -8899,22 +8901,52 @@ def export(self, dir_path=None, name=None, desc=None): except OSError as exc: return 1, "%s (%d): %s" % (exc.strerror, exc.errno, exc.filename) - from digi.xbee.util.exportutils import generate_network_xml - tree = generate_network_xml(self._local_xbee, date_now=date_now, - name=name, desc=desc) - + from digi.xbee.util.exportutils import generate_network_xml, generate_network_json from zipfile import ZipFile, ZipInfo, ZIP_DEFLATED - try: - with ZipFile(str(file), 'w') as xnet_zip: - info = ZipInfo(filename='network.xml', - date_time=time.localtime(date_now.timestamp())) - info.compress_type = ZIP_DEFLATED - with xnet_zip.open(info, 'w') as xnet_file: - tree.write(xnet_file, encoding='utf8', xml_declaration=False) - except (OSError, IOError) as exc: - return 1, "%s (%d): %s" % (exc.strerror, exc.errno, exc.filename) - return 0, str(file) + if method == "xml": + tree = generate_network_xml(self._local_xbee, + date_now=date_now, + name=name, + desc=desc) + + try: + with ZipFile(str(file), 'w') as xnet_zip: + info = ZipInfo(filename='network.xml', + date_time=time.localtime( + date_now.timestamp())) + info.compress_type = ZIP_DEFLATED + with xnet_zip.open(info, 'w') as xnet_file: + tree.write(xnet_file, + encoding='utf8', + xml_declaration=False) + except (OSError, IOError) as exc: + return 1, "%s (%d): %s" % (exc.strerror, exc.errno, + exc.filename) + + return 0, str(file) + elif method == "json": + json = generate_network_json(self._local_xbee, + date_now=date_now, + name=name, + desc=desc) + try: + with ZipFile(str(file), 'w') as xnet_zip: + info = ZipInfo(filename='network.json', + date_time=time.localtime( + date_now.timestamp())) + info.compress_type = ZIP_DEFLATED + + with xnet_zip.open(info, 'w') as xnet_file: + json.dump(json, xnet_file) + except (OSError, IOError) as exc: + return 1, "%s (%d): %s" % (exc.strerror, exc.errno, + exc.filename) + + return 0, str(file) + else: + raise NotImplementedError( + f"`{method}` is not an allowed method for serialization") def add_network_modified_callback(self, callback): """ diff --git a/digi/xbee/util/exportutils.py b/digi/xbee/util/exportutils.py index 676889c..59a04f4 100644 --- a/digi/xbee/util/exportutils.py +++ b/digi/xbee/util/exportutils.py @@ -17,8 +17,11 @@ # Restricted Rights at 48 CFR 52.227-19, as applicable. # # Digi International Inc., 9350 Excelsior Blvd., Suite 700, Hopkins, MN 55343 +import logging import datetime +import json +from collections import OrderedDict from xml.etree.ElementTree import Element, SubElement, ElementTree from digi.xbee.devices import RemoteZigBeeDevice @@ -26,6 +29,155 @@ from digi.xbee.util import utils +def generate_network_json(xbee, date_now=None, name=None, desc=None): + """ + Generates the JSON heirarchy representing the network of the given XBee. + + Params: + xbee (:class:`.XBeeDevice`): Local Xbee node. + date_now (:class: `datetime.datetime`, option, default=`None`): Date to set in JSON + name (String, optional, default=`None`): Human readable network name. + desc (String, optional, default=`None`): Description of the network. + + Return: + :class:`str`: JSON formatted `str`. + """ + network_name = "xbee_network" if name is None else f"{name}_network" + if not date_now: + date_now = datetime.datetime.now() + + root = OrderedDict({ + network_name: + OrderedDict({ + "description": "" if not desc else str(desc), + "date": str(date_now), + "map_type": "dynamic", + "protocol": str(xbee.get_protocol().code), + "devices": _generate_nodes_json(xbee) + }) + }) + + try: + return json.dumps(root, indent=2) + except TypeError: + logging.error("Unable to parse network into valid json") + return None + + +def _generate_nodes_json(xbee): + """ + Generates a JSON element representing the network of the given XBee. + + Params: + xbee (:class:`.XBeeDevice`): Local XBee node. + + Return: + :class:`dict`: A list of nodes. + """ + nodes = list() + + network = xbee.get_network() + for node in [xbee] + network.get_devices(): + nodes.append(_generate_node_json(node)) + + return nodes + + +def _generate_node_json(node) -> dict: + """ + Generates a JSON element representing the given XBee node. + + Params: + xbee (:class:`.AbstractXBeeDevice`): XBee node. + + Return: + :class:`dict`: Generated dict that represents the node. + """ + + hw = node.get_hardware_version() + fw = node.get_firmware_version() + addr = str(node.get_64bit_addr()) + + node_info = OrderedDict({ + 'addr': + addr, + 'nwk_address': + str(node.get_16bit_addr()), + 'node_id': + node.get_node_id(), + 'role': + node.get_role().description, + 'hw_version': + f"{utils.hex_to_string([hw.code], pretty=False) if hw else '???'}", + "fw_version": + f"{utils.hex_to_string(fw,pretty=False) if fw else '???'}" + }) + + # handle ZiGBee specifics + if isinstance(node, RemoteZigBeeDevice) and node.parent: + node_info['parent_address'] = str(node.parent.get_64bit_addr()) + + # if node is NOT remote then add serial information + if not node.is_remote(): + ser = node.serial_port + + node_info['serial_config'] = OrderedDict({ + 'port': + str(ser.port), + 'baud_rate': + str(ser.baudrate), + 'data_bits': + str(ser.bytesize), + 'stop_bits': + str(ser.stopbits), + 'parity': + str(FirmwareParity.get_by_parity(ser.parity).index), + }) + # handle flow control based on logic provided in `_generatoe_serial_config_xml` + fc = "0" + if ser.rtscts: + fc = "3" + elif ser.xonxoff: + fc = "12" + node_info['serial_config']['flow_control'] = fc + + network = node.get_local_xbee_device().get_network() \ + if node.is_remote() else node.get_network() + + node_info['connections'] = _generate_connections_json( + node, network.get_node_connections(node)) + + return node_info + + +def _generate_connections_json(node, connections): + """ + Generates the XML node representing the given connections. + + Params: + xbee (:class:`.AbstractXBeeDevice`): XBee node. + connections (List): List of :class:`.Connection`. + + Return: + :class:`dict`: Generated list of Connections. + """ + conn_info = list() + for conn in connections: + end_device = conn.node_b if node == conn.node_a else conn.node_a + addr = str(end_device.get_64bit_addr()) + conn_info.append( + OrderedDict({ + 'addr': + addr, + 'strength': + str(conn.lq_a2b if node == conn.node_a else conn.lq_b2a), + 'status': + str(conn.status_a2b.id if node == + conn.node_a else conn.status_b2a.id) + })) + return conn_info + + def generate_network_xml(xbee, date_now=None, name=None, desc=None): """ Generates the XML hierarchy representing the network of the given XBee. @@ -45,7 +197,8 @@ def generate_network_xml(xbee, date_now=None, name=None, desc=None): if not date_now: date_now = datetime.datetime.now() - net_node = Element("network", attrib={"name": "%s_network" if not name else name}) + net_node = Element("network", + attrib={"name": "%s_network" if not name else name}) net_node.text = "\n" + '\t' * level desc_node = SubElement(net_node, "description") desc_node.text = "" if not desc else desc @@ -102,7 +255,8 @@ def _generate_node_xml(node, level=0): Return: :class:`xml.etree.ElementTree.Element`: Generated XML element. """ - device_node = Element("device", attrib={"address": str(node.get_64bit_addr())}) + device_node = Element("device", + attrib={"address": str(node.get_64bit_addr())}) device_node.text = "\n" + '\t' * level device_node.tail = "\n" + '\t' * (level - 1) net_addr = SubElement(device_node, "nwk_address") @@ -130,13 +284,15 @@ def _generate_node_xml(node, level=0): fw_version.tail = "\n" + '\t' * level if not node.is_remote(): - device_node.append(_generate_serial_config_xml(node.serial_port, level + 1)) + device_node.append( + _generate_serial_config_xml(node.serial_port, level + 1)) network = node.get_local_xbee_device().get_network() \ if node.is_remote() else node.get_network() - device_node.append(_generate_connections_xml( - node, network.get_node_connections(node), level + 1)) + device_node.append( + _generate_connections_xml(node, network.get_node_connections(node), + level + 1)) return device_node @@ -168,8 +324,7 @@ def _generate_serial_config_xml(serial_port, level=0): stop_bits.text = str(serial_port.stopbits) stop_bits.tail = "\n" + '\t' * level parity = SubElement(serial_cfg_node, "parity") - parity.text = str( - FirmwareParity.get_by_parity(serial_port.parity).index) + parity.text = str(FirmwareParity.get_by_parity(serial_port.parity).index) parity.tail = "\n" + '\t' * level flow_control = SubElement(serial_cfg_node, "flow_control") # Values used in XCTU and XNA @@ -201,16 +356,18 @@ def _generate_connections_xml(node, connections, level=0): connections_node.tail = "\n" + '\t' * (level - 2) for conn in connections: end_device = conn.node_b if node == conn.node_a else conn.node_a - conn_node = SubElement(connections_node, "connection", - attrib={"address": str(end_device.get_64bit_addr())}) + conn_node = SubElement( + connections_node, + "connection", + attrib={"address": str(end_device.get_64bit_addr())}) conn_node.text = "\n" + '\t' * (level + 1) conn_node.tail = "\n" + '\t' * level conn_lq = SubElement(conn_node, "strength") conn_lq.text = str(conn.lq_a2b if node == conn.node_a else conn.lq_b2a) conn_lq.tail = "\n" + '\t' * (level + 1) conn_status = SubElement(conn_node, "status") - conn_status.text = str( - conn.status_a2b.id if node == conn.node_a else conn.status_b2a.id) + conn_status.text = str(conn.status_a2b.id if node == + conn.node_a else conn.status_b2a.id) conn_status.tail = "\n" + '\t' * level last_conn = connections_node.find("./connection[last()]")