Skip to content

Commit

Permalink
Merge branch 'development' into Fix_#833
Browse files Browse the repository at this point in the history
  • Loading branch information
ebroecker authored Jan 24, 2025
2 parents f67419a + ab25b17 commit c3c0280
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
# os: [ubuntu-latest]
experimental: [false]
# python-version: ["2.7","3.4","3.5","3.6","3.7","3.8","3.9","3.10","3.11","3.12"]
python-version: ["3.7","3.8","3.9","3.10","3.11","3.12"]
python-version: ["3.8","3.9","3.10","3.11","3.12", "3.13"]
fail-fast: false
steps:
- uses: actions/checkout@v4
Expand Down
1 change: 0 additions & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ skip_branch_with_pr: true

environment:
matrix:
- TOXENV: py37
- TOXENV: py38
- TOXENV: py39
- TOXENV: py310
Expand Down
4 changes: 3 additions & 1 deletion src/canmatrix/formats/dbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ def dump(in_db, f, **options):
# remove "-" from frame names
if compatibility:
frame.name = re.sub("[^A-Za-z0-9]", whitespace_replacement, frame.name)
if frame.name[0].isdigit():
frame.name = "_" + frame.name

duplicate_signal_totals = collections.Counter(normalized_names.values())
duplicate_signal_counter = collections.Counter() # type: typing.Counter[str]
Expand Down Expand Up @@ -459,7 +461,7 @@ def dump(in_db, f, **options):
if frame.is_complex_multiplexed:
for signal in frame.signals:
if signal.muxer_for_signal is not None:
f.write(("SG_MUL_VAL_ %d %s %s " % (frame.arbitration_id.to_compound_integer(), signal.name, signal.muxer_for_signal)).encode(dbc_export_encoding, ignore_encoding_errors))
f.write(("SG_MUL_VAL_ %d %s %s " % (frame.arbitration_id.to_compound_integer(), output_names[frame][signal], signal.muxer_for_signal)).encode(dbc_export_encoding, ignore_encoding_errors))
f.write((", ".join(["%d-%d" % (a, b) for a, b in signal.mux_val_grp])).encode(dbc_export_encoding, ignore_encoding_errors))

f.write(";\n".encode(dbc_export_encoding, ignore_encoding_errors))
Expand Down
295 changes: 196 additions & 99 deletions src/canmatrix/formats/eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import canopen.objectdictionary.eds
import canopen.objectdictionary.datatypes
import codecs

import copy
import re
import math

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -33,129 +35,224 @@
0x82: 'Reset Communication'}


def get_signals(parent_object, signal_receiver):
signals = []
position = 0
for sub in range(1, len(parent_object)):
name = parent_object[sub].name
size = datatype_mapping[parent_object[sub].data_type][1]
unsigned = "UNSIGNED" in datatype_mapping[parent_object[sub].data_type][0]
signal = canmatrix.Signal(name = name, receivers=[signal_receiver], size=size, start_bit = position, is_signed = not unsigned)
position += size
signals.append(signal)
return signals
def name_cleanup(in_str):
rets_str = re.sub("[^A-Za-z0-9]", '_', in_str)
return rets_str

def get_bit_length(data_type_code):
return datatype_mapping[data_type_code][1]
def get_data_type_name(data_type_code):
return datatype_mapping[data_type_code][0]

def format_index(index, subindex):
return f"Index: 0x{index:04X}{subindex:02X}"

def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatrix
eds_import_encoding = options.get("edsImportEncoding", 'iso-8859-1')
node_id = options.get("eds_node_id", 1)
node_id = options.get("eds_node_id", 0)
generic = options.get("generic", False)
fp = codecs.getreader(eds_import_encoding)(f)
od = canopen.objectdictionary.eds.import_eds(fp, node_id)
db = canmatrix.CanMatrix()
signal_group_counter = 1

node_name = od.device_information.product_name
if len(node_name) == 0:
node_name = "DUMMY"
plc_name = "PLC"
if generic is True:
nm_out = canmatrix.canmatrix.Frame(name="NMT_Out_Request", size=2, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0), transmitters=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="nmt_CMD", size=8, start_bit = 0, receivers=[node_name])
for val, val_name in cmd_values.items():
sig_cmd.add_values(val, val_name)
nm_out.add_signal(sig_cmd)
nm_out.add_signal(canmatrix.canmatrix.Signal(name="Node_ID", size=8, start_bit = 8, receivers=[node_name]))
db.add_frame(nm_out)

nm_responde = canmatrix.canmatrix.Frame(name="NMT_Response_Frame_In", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x700+node_id), transmitters=[node_name])
response_sig1 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 0, receivers=[plc_name])
nm_responde.add_signal(response_sig1)
response_sig2 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 32, receivers=[plc_name])
nm_responde.add_signal(response_sig2)
db.add_frame(nm_responde)

sync = canmatrix.canmatrix.Frame(name="SYNC", size=0, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80), transmitters=[plc_name])
db.add_frame(sync)

emcy = canmatrix.canmatrix.Frame(name="EMCY", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80+node_id), transmitters=[node_name])
emcy.add_signal(canmatrix.canmatrix.Signal(name="EMCY_Error_Code", size=16, start_bit=0, receivers=[plc_name]))
emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Reg", size=8, start_bit=16, receivers=[plc_name]))
emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Number", size=8, start_bit=24, receivers=[plc_name]))
db.add_frame(emcy)

nm_out = canmatrix.canmatrix.Frame(name="NMT_Out_Request", size=2, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0), transmitters=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="nmt_CMD", size=8, start_bit = 0, receivers=[node_name])
for val, val_name in cmd_values.items():
sig_cmd.add_values(val, val_name)
nm_out.add_signal(sig_cmd)
nm_out.add_signal(canmatrix.canmatrix.Signal(name="Node_ID", size=8, start_bit = 8, receivers=[node_name]))
db.add_frame(nm_out)

nm_responde = canmatrix.canmatrix.Frame(name="NMT_Response_Frame_In", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x700+node_id), transmitters=[node_name])
response_sig1 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 0, receivers=[plc_name])
nm_responde.add_signal(response_sig1)
response_sig2 = canmatrix.canmatrix.Signal(name="NMT_Response_1", size=32, start_bit = 32, receivers=[plc_name])
nm_responde.add_signal(response_sig2)
db.add_frame(nm_responde)

sync = canmatrix.canmatrix.Frame(name="SYNC", size=0, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80), transmitters=[plc_name])
db.add_frame(sync)

emcy = canmatrix.canmatrix.Frame(name="EMCY", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x80+node_id), transmitters=[node_name])
emcy.add_signal(canmatrix.canmatrix.Signal(name="EMCY_Error_Code", size=16, start_bit=0, receivers=[plc_name]))
emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Reg", size=8, start_bit=16, receivers=[plc_name]))
emcy.add_signal(canmatrix.canmatrix.Signal(name="E_Number", size=8, start_bit=24, receivers=[plc_name]))
db.add_frame(emcy)

sdo_down = canmatrix.canmatrix.Frame(name="SDO_download", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x600+node_id), transmitters=[node_name])
sig_cmd = canmatrix.canmatrix.Signal(name="sdo_down_CMD", size=8, start_bit=0, receivers=[plc_name])
sdo_down = canmatrix.canmatrix.Frame(name="SDO_receive", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x600+node_id), transmitters=[node_name])
sig_cmd = canmatrix.canmatrix.Signal(name="CCS", size=3, start_bit=5, receivers=[plc_name], is_signed=False)
sig_cmd.is_multiplexer = True
sdo_down.is_complex_multiplexed = True
sig_cmd.multiplex = "Multiplexor"
sdo_down.add_signal(sig_cmd)
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="sdo_down_IDX", size=16, start_bit=8, receivers=[plc_name]))
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="sdo_down_SUBIDX", size=8, start_bit=24, receivers=[plc_name]))
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data8", size=8, start_bit=32, receivers=[plc_name], multiplex=0x2f))
sig_cmd.add_values(0x2f, "8_bytes")
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data16", size=16, start_bit=32, receivers=[plc_name], multiplex=0x2b))
sig_cmd.add_values(0x2b, "16_bytes")
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data24", size=24, start_bit=32, receivers=[plc_name], multiplex=0x27))
sig_cmd.add_values(0x27, "3_bytes")
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data32", size=32, start_bit=32, receivers=[plc_name], multiplex=0x23))
sig_cmd.add_values(0x23, "4_bytes")
sig_cmd.add_values(0x40, "upload_request")
index = canmatrix.canmatrix.Signal(name="IDX", size=24, start_bit=8, receivers=[plc_name])
index.multiplex = "Multiplexor"
index.is_multiplexer = True
index.mux_val = 1
index.mux_val_grp.append([ 2, 2])
index.muxer_for_signal = "CCS"
sdo_down.add_signal(index)
db.add_frame(sdo_down)

sdo_up = canmatrix.canmatrix.Frame(name="SDO_upload", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x580+node_id), transmitters=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="sdo_state", size=8, start_bit=0, receivers=[node_name])
sdo_up = canmatrix.canmatrix.Frame(name="SDO_transmit", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x580+node_id), transmitters=[plc_name])
sig_cmd = canmatrix.canmatrix.Signal(name="SCS", size=3, start_bit=5, is_signed=False)
sig_cmd.is_multiplexer = True
sdo_up.is_complex_multiplexed = True
sig_cmd.multiplex = "Multiplexor"
sdo_up.add_signal(sig_cmd)
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="sdo_uo_IDX", size=16, start_bit=8, receivers=[node_name]))
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="sdo_up_SUBIDX", size=8, start_bit=24, receivers=[node_name]))
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="error_code", size=32, start_bit=32, receivers=[node_name], multiplex=0x80))
sig_cmd.add_values(0x80, "upload_error")

sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data8", size=8, start_bit=32, receivers=[plc_name], multiplex=0x4f))
sig_cmd.add_values(0x2f, "8_bytes")
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data16", size=16, start_bit=32, receivers=[plc_name], multiplex=0x4b))
sig_cmd.add_values(0x2b, "16_bytes")
sdo_up.add_signal(canmatrix.canmatrix.Signal(name="data24", size=24, start_bit=32, receivers=[plc_name], multiplex=0x47))
sig_cmd.add_values(0x27, "3_bytes")
sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data32", size=32, start_bit=32, receivers=[plc_name], multiplex=0x43))
sig_cmd.add_values(0x23, "4_bytes")

index = canmatrix.canmatrix.Signal(name="IDX", size=24, start_bit=8)
index.multiplex = "Multiplexor"
index.is_multiplexer = True
index.mux_val = 2
index.mux_val_grp.append([ 2, 2])
index.muxer_for_signal = "SCS"
sdo_up.add_signal(index)
db.add_frame(sdo_up)


# RX Can-Ids ...
for index in range(0x1400, 0x1408):
if index in od:
# store canid in object...
od[index+0x200].canid = od[index][1].default
for obj in od.values():
if isinstance(obj, canopen.objectdictionary.ODVariable):
subindex = 0
combined_value = int(f"{subindex:02X}{obj.index:04X}", 16)
signal_name = name_cleanup(obj.name)
size = get_bit_length(obj.data_type)
if size == 0:
logger.info("Ignoring " + signal_name + " size 0")
continue
new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=size, start_bit=32, receivers=[plc_name])
datatype_name = get_data_type_name(obj.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.mux_val = combined_value
new_sig.mux_val_grp.append([ combined_value, combined_value])
new_sig.muxer_for_signal = "IDX"
sdo_down.add_signal(new_sig)
up_sig = copy.deepcopy(new_sig)
up_sig.muxer_for_signal = "IDX"

##RX PDOs
for index in range(0x1600, 0x1608):
if index in od:
pdo_name = od[index].name.replace(" ", "_")
frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[plc_name])
db.add_frame(frame)
frame_id = od[index].canid
up_sig.receivers = []
sdo_up.add_signal(up_sig)
elif isinstance(obj, canopen.objectdictionary.ODRecord):
members = []
for subobj in obj.values():
combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16)
signal_name = name_cleanup(subobj.name)
size = get_bit_length(subobj.data_type)
if size == 0:
logger.info("Ignoring " + signal_name + " size 0")
continue

new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=size, start_bit=32, receivers=[plc_name])
datatype_name = get_data_type_name(subobj.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.mux_val = combined_value
new_sig.mux_val_grp.append([ combined_value, combined_value])
new_sig.muxer_for_signal = "IDX"
sdo_down.add_signal(new_sig)
up_sig = copy.deepcopy(new_sig)
up_sig.muxer_for_signal = "IDX"

up_sig.receivers = []
sdo_up.add_signal(up_sig)
if len(members) > 0:
sdo_down.add_signal_group("SG_R_" + name_cleanup(obj.name), signal_group_counter, members)
signal_group_counter += 1

elif isinstance(obj, canopen.objectdictionary.ODArray):
members = []
for subobj in obj.values():
combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16)
signal_name = name_cleanup(subobj.name)
size = get_bit_length(subobj.data_type)
if size == 0:
logger.info("Ignoring " + signal_name + " size 0")
continue

new_sig = canmatrix.canmatrix.Signal(name=signal_name, size=size, start_bit=32, receivers=[plc_name])
datatype_name = get_data_type_name(subobj.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.mux_val = combined_value
new_sig.mux_val_grp.append([ combined_value, combined_value])
new_sig.muxer_for_signal = "IDX"
sdo_down.add_signal(new_sig)
members.append(signal_name)
up_sig = copy.deepcopy(new_sig)
up_sig.muxer_for_signal = "IDX"
up_sig.receivers = []
sdo_up.add_signal(up_sig)
if len(members) > 0:
sdo_down.add_signal_group("SG_A_" + name_cleanup(obj.name), signal_group_counter, members)
signal_group_counter += 1


for start_index, rx_tx_config in {0x1400 : {"transmitter": [], "receiver": [node_name]}, 0x1800: {"transmitter": [node_name], "receiver": []}}.items():
for comm_index in range(start_index, start_index + 0x8):
map_index = comm_index + 0x200
if comm_index not in od or map_index not in od:
continue

# Retrieve the COB-ID
comm_param = od[comm_index] #od.get(comm_index)
cob_id_entry = comm_param.get(1) if comm_param else None
if not cob_id_entry or cob_id_entry.default is None:
# print(f" Warning: No valid COB-ID found for {pdo_type} PDO at index 0x{comm_index:04X}. Skipping.")
continue
cob_id = cob_id_entry.default & 0x7FF
pdo_name = name_cleanup(od[comm_index].name)
frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=rx_tx_config["transmitter"])
frame_id = cob_id
frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id)
frame.size = 8
signals = get_signals(od[index], node_name)
for sig in signals:
frame.add_signal(sig)

# RT Can-Ids ...
for index in range(0x1800, 0x1808):
if index in od:
# store canid in object...
od[index+0x200].canid = od[index][1].default - 0x40000000

#TX
for index in range(0x1A00, 0x1A08):
if index in od:
frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[node_name])
db.add_frame(frame)
pdo_name = od[index].name.replace(" ", "_")
frame_id = od[index].canid
frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id)
frame.size = 8
signals = get_signals(od[index], plc_name)
for sig in signals:
frame.add_signal(sig)
mapping_param = od.get(map_index)
if not mapping_param:
# print(f" Warning: No mapping parameter found for {pdo_type} PDO at index 0x{map_index:04X}.")
continue
num_entries = mapping_param[0].default if 0 in mapping_param else 0
current_bit_start = 0
for subindex in range(1, num_entries + 1):
mapping_entry = mapping_param.get(subindex)
if not mapping_entry or mapping_entry.default is None:
#print(f" Warning: Subindex {subindex} missing for mapping parameter at 0x{map_index:04X}.")
continue

# Decode the mapping entry
mapping_value = mapping_entry.default
obj_index = (mapping_value >> 16) & 0xFFFF
obj_subindex = (mapping_value >> 8) & 0xFF
bit_length = mapping_value & 0xFF

# Fetch the mapped object
mapped_obj = od.get_variable(obj_index, obj_subindex)
if not mapped_obj:
#print(f" Warning: Could not find object at Index: 0x{obj_index:04X}, Subindex: {obj_subindex}.")
current_bit_start += bit_length
continue
signal_name = name_cleanup(mapped_obj.name)
new_sig = canmatrix.Signal(signal_name, size=bit_length, start_bit=current_bit_start)
datatype_name = get_data_type_name(mapping_entry.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.factor = mapped_obj.factor
if mapped_obj.min is not None:
new_sig.min = mapped_obj.min
new_sig.offset = mapped_obj.min
if mapped_obj.max is not None:
new_sig.max = mapped_obj.max
new_sig.receivers = rx_tx_config["receiver"]
frame.add_signal(new_sig)
current_bit_start += bit_length
frame.size = math.ceil(current_bit_start/8)

db.update_ecu_list()
for ecu in db.ecus:
db.rename_ecu(ecu.name, name_cleanup(ecu.name))
return db

0 comments on commit c3c0280

Please sign in to comment.