Skip to content

added json support with the network export functionality. #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions digi/xbee/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -8864,7 +8864,7 @@ def get_number_devices(self):
"""
return len(self.__devices_list)

def export(self, dir_path=None, name=None, desc=None):
def export(self, dir_path=None, name=None, desc=None, method="xml"):
"""
Exports this network to the given file path.

Expand All @@ -8876,6 +8876,7 @@ def export(self, dir_path=None, name=None, desc=None):
name. If not defined home directory is used.
name (String, optional, default=`None`): Network human readable name.
desc (String, optional, default=`None`): Network description.
method (String, optional, default='xml'): Type of structure to export to. Supports (xml, json)

Returns:
Tuple (Integer, String): Tuple with result (0: success, 1: failure)
Expand All @@ -8884,13 +8885,14 @@ def export(self, dir_path=None, name=None, desc=None):
import datetime
from pathlib import Path

method = method.strip().lower()
date_now = datetime.datetime.now()
if not dir_path:
dir_path = str(Path.home())
if not name:
name = "%s network" % str(self._local_xbee)
file_name = "%s_%s.xnet" % (name.strip().replace(" ", "_"),
date_now.strftime("%m%d%y_%H%M%S"))
file_name = "%s_%s.xnet" % (name.strip().replace(
" ", "_"), date_now.strftime("%m%d%y_%H%M%S"))
file = Path(dir_path, file_name)
try:
if file.exists():
Expand All @@ -8899,22 +8901,52 @@ def export(self, dir_path=None, name=None, desc=None):
except OSError as exc:
return 1, "%s (%d): %s" % (exc.strerror, exc.errno, exc.filename)

from digi.xbee.util.exportutils import generate_network_xml
tree = generate_network_xml(self._local_xbee, date_now=date_now,
name=name, desc=desc)

from digi.xbee.util.exportutils import generate_network_xml, generate_network_json
from zipfile import ZipFile, ZipInfo, ZIP_DEFLATED
try:
with ZipFile(str(file), 'w') as xnet_zip:
info = ZipInfo(filename='network.xml',
date_time=time.localtime(date_now.timestamp()))
info.compress_type = ZIP_DEFLATED
with xnet_zip.open(info, 'w') as xnet_file:
tree.write(xnet_file, encoding='utf8', xml_declaration=False)
except (OSError, IOError) as exc:
return 1, "%s (%d): %s" % (exc.strerror, exc.errno, exc.filename)

return 0, str(file)
if method == "xml":
tree = generate_network_xml(self._local_xbee,
date_now=date_now,
name=name,
desc=desc)

try:
with ZipFile(str(file), 'w') as xnet_zip:
info = ZipInfo(filename='network.xml',
date_time=time.localtime(
date_now.timestamp()))
info.compress_type = ZIP_DEFLATED
with xnet_zip.open(info, 'w') as xnet_file:
tree.write(xnet_file,
encoding='utf8',
xml_declaration=False)
except (OSError, IOError) as exc:
return 1, "%s (%d): %s" % (exc.strerror, exc.errno,
exc.filename)

return 0, str(file)
elif method == "json":
json = generate_network_json(self._local_xbee,
date_now=date_now,
name=name,
desc=desc)
try:
with ZipFile(str(file), 'w') as xnet_zip:
info = ZipInfo(filename='network.json',
date_time=time.localtime(
date_now.timestamp()))
info.compress_type = ZIP_DEFLATED

with xnet_zip.open(info, 'w') as xnet_file:
json.dump(json, xnet_file)
except (OSError, IOError) as exc:
return 1, "%s (%d): %s" % (exc.strerror, exc.errno,
exc.filename)

return 0, str(file)
else:
raise NotImplementedError(
f"`{method}` is not an allowed method for serialization")

def add_network_modified_callback(self, callback):
"""
Expand Down
179 changes: 168 additions & 11 deletions digi/xbee/util/exportutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,167 @@
# Restricted Rights at 48 CFR 52.227-19, as applicable.
#
# Digi International Inc., 9350 Excelsior Blvd., Suite 700, Hopkins, MN 55343
import logging
import datetime
import json

from collections import OrderedDict
from xml.etree.ElementTree import Element, SubElement, ElementTree

from digi.xbee.devices import RemoteZigBeeDevice
from digi.xbee.profile import FirmwareParity
from digi.xbee.util import utils


def generate_network_json(xbee, date_now=None, name=None, desc=None):
"""
Generates the JSON heirarchy representing the network of the given XBee.

Params:
xbee (:class:`.XBeeDevice`): Local Xbee node.
date_now (:class: `datetime.datetime`, option, default=`None`): Date to set in JSON
name (String, optional, default=`None`): Human readable network name.
desc (String, optional, default=`None`): Description of the network.

Return:
:class:`str`: JSON formatted `str`.
"""
network_name = "xbee_network" if name is None else f"{name}_network"
if not date_now:
date_now = datetime.datetime.now()

root = OrderedDict({
network_name:
OrderedDict({
"description": "" if not desc else str(desc),
"date": str(date_now),
"map_type": "dynamic",
"protocol": str(xbee.get_protocol().code),
"devices": _generate_nodes_json(xbee)
})
})

try:
return json.dumps(root, indent=2)
except TypeError:
logging.error("Unable to parse network into valid json")
return None


def _generate_nodes_json(xbee):
"""
Generates a JSON element representing the network of the given XBee.

Params:
xbee (:class:`.XBeeDevice`): Local XBee node.

Return:
:class:`dict`: A list of nodes.
"""
nodes = list()

network = xbee.get_network()
for node in [xbee] + network.get_devices():
nodes.append(_generate_node_json(node))

return nodes


def _generate_node_json(node) -> dict:
"""
Generates a JSON element representing the given XBee node.

Params:
xbee (:class:`.AbstractXBeeDevice`): XBee node.

Return:
:class:`dict`: Generated dict that represents the node.
"""

hw = node.get_hardware_version()
fw = node.get_firmware_version()
addr = str(node.get_64bit_addr())

node_info = OrderedDict({
'addr':
addr,
'nwk_address':
str(node.get_16bit_addr()),
'node_id':
node.get_node_id(),
'role':
node.get_role().description,
'hw_version':
f"{utils.hex_to_string([hw.code], pretty=False) if hw else '???'}",
"fw_version":
f"{utils.hex_to_string(fw,pretty=False) if fw else '???'}"
})

# handle ZiGBee specifics
if isinstance(node, RemoteZigBeeDevice) and node.parent:
node_info['parent_address'] = str(node.parent.get_64bit_addr())

# if node is NOT remote then add serial information
if not node.is_remote():
ser = node.serial_port

node_info['serial_config'] = OrderedDict({
'port':
str(ser.port),
'baud_rate':
str(ser.baudrate),
'data_bits':
str(ser.bytesize),
'stop_bits':
str(ser.stopbits),
'parity':
str(FirmwareParity.get_by_parity(ser.parity).index),
})
# handle flow control based on logic provided in `_generatoe_serial_config_xml`
fc = "0"
if ser.rtscts:
fc = "3"
elif ser.xonxoff:
fc = "12"
node_info['serial_config']['flow_control'] = fc

network = node.get_local_xbee_device().get_network() \
if node.is_remote() else node.get_network()

node_info['connections'] = _generate_connections_json(
node, network.get_node_connections(node))

return node_info


def _generate_connections_json(node, connections):
"""
Generates the XML node representing the given connections.

Params:
xbee (:class:`.AbstractXBeeDevice`): XBee node.
connections (List): List of :class:`.Connection`.

Return:
:class:`dict`: Generated list of Connections.
"""
conn_info = list()
for conn in connections:
end_device = conn.node_b if node == conn.node_a else conn.node_a
addr = str(end_device.get_64bit_addr())
conn_info.append(
OrderedDict({
'addr':
addr,
'strength':
str(conn.lq_a2b if node == conn.node_a else conn.lq_b2a),
'status':
str(conn.status_a2b.id if node ==
conn.node_a else conn.status_b2a.id)
}))
return conn_info


def generate_network_xml(xbee, date_now=None, name=None, desc=None):
"""
Generates the XML hierarchy representing the network of the given XBee.
Expand All @@ -45,7 +197,8 @@ def generate_network_xml(xbee, date_now=None, name=None, desc=None):
if not date_now:
date_now = datetime.datetime.now()

net_node = Element("network", attrib={"name": "%s_network" if not name else name})
net_node = Element("network",
attrib={"name": "%s_network" if not name else name})
net_node.text = "\n" + '\t' * level
desc_node = SubElement(net_node, "description")
desc_node.text = "" if not desc else desc
Expand Down Expand Up @@ -102,7 +255,8 @@ def _generate_node_xml(node, level=0):
Return:
:class:`xml.etree.ElementTree.Element`: Generated XML element.
"""
device_node = Element("device", attrib={"address": str(node.get_64bit_addr())})
device_node = Element("device",
attrib={"address": str(node.get_64bit_addr())})
device_node.text = "\n" + '\t' * level
device_node.tail = "\n" + '\t' * (level - 1)
net_addr = SubElement(device_node, "nwk_address")
Expand Down Expand Up @@ -130,13 +284,15 @@ def _generate_node_xml(node, level=0):
fw_version.tail = "\n" + '\t' * level

if not node.is_remote():
device_node.append(_generate_serial_config_xml(node.serial_port, level + 1))
device_node.append(
_generate_serial_config_xml(node.serial_port, level + 1))

network = node.get_local_xbee_device().get_network() \
if node.is_remote() else node.get_network()

device_node.append(_generate_connections_xml(
node, network.get_node_connections(node), level + 1))
device_node.append(
_generate_connections_xml(node, network.get_node_connections(node),
level + 1))

return device_node

Expand Down Expand Up @@ -168,8 +324,7 @@ def _generate_serial_config_xml(serial_port, level=0):
stop_bits.text = str(serial_port.stopbits)
stop_bits.tail = "\n" + '\t' * level
parity = SubElement(serial_cfg_node, "parity")
parity.text = str(
FirmwareParity.get_by_parity(serial_port.parity).index)
parity.text = str(FirmwareParity.get_by_parity(serial_port.parity).index)
parity.tail = "\n" + '\t' * level
flow_control = SubElement(serial_cfg_node, "flow_control")
# Values used in XCTU and XNA
Expand Down Expand Up @@ -201,16 +356,18 @@ def _generate_connections_xml(node, connections, level=0):
connections_node.tail = "\n" + '\t' * (level - 2)
for conn in connections:
end_device = conn.node_b if node == conn.node_a else conn.node_a
conn_node = SubElement(connections_node, "connection",
attrib={"address": str(end_device.get_64bit_addr())})
conn_node = SubElement(
connections_node,
"connection",
attrib={"address": str(end_device.get_64bit_addr())})
conn_node.text = "\n" + '\t' * (level + 1)
conn_node.tail = "\n" + '\t' * level
conn_lq = SubElement(conn_node, "strength")
conn_lq.text = str(conn.lq_a2b if node == conn.node_a else conn.lq_b2a)
conn_lq.tail = "\n" + '\t' * (level + 1)
conn_status = SubElement(conn_node, "status")
conn_status.text = str(
conn.status_a2b.id if node == conn.node_a else conn.status_b2a.id)
conn_status.text = str(conn.status_a2b.id if node ==
conn.node_a else conn.status_b2a.id)
conn_status.tail = "\n" + '\t' * level

last_conn = connections_node.find("./connection[last()]")
Expand Down