diff --git a/tests/test_tuya_power.py b/tests/test_tuya_power.py index b0f5a525db..0e44956b8b 100644 --- a/tests/test_tuya_power.py +++ b/tests/test_tuya_power.py @@ -88,3 +88,530 @@ async def test_ts0601_power_converter(zigpy_device_from_v2_quirk, msg, expected_ assert status == foundation.Status.SUCCESS assert tuya_manufacturer.get("power") == expected_power + + +@pytest.mark.parametrize( + "manufacturer,power_a_msg,flow_a_msg,power_b_msg,flow_b_msg,expected_power_a,expected_power_b,expected_total", + [ + # Forward flow for both CTs - _TZE204 model + ( + "_TZE204_81yrt3lo", + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x03\xe8", # DP 101: power_a = 1000 + b"\x09\x11\x02\x00\x87\x66\x04\x00\x01\x00", # DP 102: energy_flow_a = 0 (Forward) + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x01\xf4", # DP 105: power_b = 500 + b"\x09\x11\x02\x00\x87\x68\x04\x00\x01\x00", # DP 104: energy_flow_b = 0 (Forward) + 1000, # Expected power A (positive for forward) + 500, # Expected power B (positive for forward) + 1500, # Expected total + ), + # Reverse flow for both CTs - _TZE204 model + ( + "_TZE204_81yrt3lo", + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x03\xe8", # DP 101: power_a = 1000 + b"\x09\x0a\x02\x00\x80\x66\x04\x00\x01\x01", # DP 102: energy_flow_a = 1 (Reverse) + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x01\xf4", # DP 105: power_b = 500 + b"\x09\x0a\x02\x00\x80\x68\x04\x00\x01\x01", # DP 104: energy_flow_b = 1 (Reverse) + -1000, # Expected power A (negative for reverse) + -500, # Expected power B (negative for reverse) + -1500, # Expected total + ), + # Mixed flow directions - _TZE204 model + ( + "_TZE204_81yrt3lo", + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x04\x00", # DP 101: power_a = 1024 + b"\x09\x11\x02\x00\x87\x66\x04\x00\x01\x00", # DP 102: energy_flow_a = 0 (Forward) + b"\x09\x0a\x02\x00\x04\x69\x02\x00\x04\x00\x00\x02\x00", # DP 105: power_b = 512 + b"\x09\x0a\x02\x00\x80\x68\x04\x00\x01\x01", # DP 104: energy_flow_b = 1 (Reverse) + 1024, # Expected power A (positive for forward) + -512, # Expected power B (negative for reverse) + 512, # Expected total (1024 - 512) + ), + # Forward flow for both CTs - _TZE284 model + ( + "_TZE284_81yrt3lo", + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x03\xe8", # DP 101: power_a = 1000 + b"\x09\x11\x02\x00\x87\x66\x04\x00\x01\x00", # DP 102: energy_flow_a = 0 (Forward) + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x01\xf4", # DP 105: power_b = 500 + b"\x09\x11\x02\x00\x87\x68\x04\x00\x01\x00", # DP 104: energy_flow_b = 0 (Forward) + 1000, # Expected power A (positive for forward) + 500, # Expected power B (positive for forward) + 1500, # Expected total + ), + ], +) +async def test_matseeplus_power_reporting( + zigpy_device_from_v2_quirk, + manufacturer, + power_a_msg, + flow_a_msg, + power_b_msg, + flow_b_msg, + expected_power_a, + expected_power_b, + expected_total, +): + """Test power reporting using Tuya DP messages with default settings (late flow mitigation disabled).""" + quirked = zigpy_device_from_v2_quirk(manufacturer, "TS0601") + ep = quirked.endpoints[1] + + tuya_manufacturer = ep.tuya_manufacturer + + def send_dp_message(msg): + """Send and verify a DP message.""" + hdr, data = tuya_manufacturer.deserialize(msg) + status = tuya_manufacturer.handle_get_data(data.data) + assert status == foundation.Status.SUCCESS + + # Send messages in order: flow first, then power (flow is sent first for correct sign application) + send_dp_message(flow_a_msg) + send_dp_message(power_a_msg) + send_dp_message(flow_b_msg) + send_dp_message(power_b_msg) + + # Check power values on electrical measurement clusters + ep1_electrical = quirked.endpoints[1].electrical_measurement + ep2_electrical = quirked.endpoints[2].electrical_measurement + ep3_electrical = quirked.endpoints[3].electrical_measurement + + assert ep1_electrical.get("active_power") == expected_power_a + assert ep2_electrical.get("active_power") == expected_power_b + assert ep3_electrical.get("active_power") == expected_total + + +@pytest.mark.parametrize( + "msg,endpoint_id,cluster_name,attr_name,expected_value", + [ + # Metering DP messages + ( + b"\x09\x1f\x02\x00\x04\x6a\x02\x00\x04\x00\x00\x30\x39", + 1, + "smartenergy_metering", + "current_summ_delivered", + 12345, + ), # DP 106: current_summ_delivered CT A + ( + b"\x09\x1f\x02\x00\x04\x6b\x02\x00\x04\x00\x00\x1a\x85", + 1, + "smartenergy_metering", + "current_summ_received", + 6789, + ), # DP 107: current_summ_received CT A + ( + b"\x09\x1f\x02\x00\x04\x6c\x02\x00\x04\x00\x00\xd4\x31", + 2, + "smartenergy_metering", + "current_summ_delivered", + 54321, + ), # DP 108: current_summ_delivered CT B + ( + b"\x09\x1f\x02\x00\x04\x6d\x02\x00\x04\x00\x00\x26\x94", + 2, + "smartenergy_metering", + "current_summ_received", + 9876, + ), # DP 109: current_summ_received CT B + # Electrical measurement DP messages + ( + b"\x09\x1f\x02\x00\x04\x6e\x02\x00\x04\x00\x00\x03\xe8", + 1, + "electrical_measurement", + "power_factor", + 1000, + ), # DP 110: power_factor CT A + ( + b"\x09\x1f\x02\x00\x04\x71\x02\x00\x04\x00\x00\x03\xe8", + 1, + "electrical_measurement", + "rms_current", + 1000, + ), # DP 113: rms_current CT A + ( + b"\x09\x1f\x02\x00\x04\x72\x02\x00\x04\x00\x00\x07\xd0", + 2, + "electrical_measurement", + "rms_current", + 2000, + ), # DP 114: rms_current CT B + ( + b"\x09\x1f\x02\x00\x04\x70\x02\x00\x04\x00\x00\x08\xfc", + 3, + "electrical_measurement", + "rms_voltage", + 2300, + ), # DP 112: rms_voltage (total) + ( + b"\x09\x1f\x02\x00\x04\x6f\x02\x00\x04\x00\x00\x13\x88", + 3, + "electrical_measurement", + "ac_frequency", + 5000, + ), # DP 111: ac_frequency (total) + ( + b"\x09\x1f\x02\x00\x04\x79\x02\x00\x04\x00\x00\x03\xe8", + 2, + "electrical_measurement", + "power_factor", + 1000, + ), # DP 121: power_factor CT B + ], +) +async def test_matseeplus_electrical_and_metering( + zigpy_device_from_v2_quirk, + msg, + endpoint_id, + cluster_name, + attr_name, + expected_value, +): + """Test electrical measurement and metering attributes.""" + quirked = zigpy_device_from_v2_quirk("_TZE204_81yrt3lo", "TS0601") + ep = quirked.endpoints[1] + + tuya_manufacturer = ep.tuya_manufacturer + hdr, data = tuya_manufacturer.deserialize(msg) + status = tuya_manufacturer.handle_get_data(data.data) + assert status == foundation.Status.SUCCESS + + cluster = getattr(quirked.endpoints[endpoint_id], cluster_name) + assert cluster.get(attr_name) == expected_value + + +@pytest.mark.parametrize( + "late_flow_a,late_flow_b", + [ + (False, False), # Both late flow disabled (default behavior) + (True, False), # Only A late flow enabled + (False, True), # Only B late flow enabled + (True, True), # Both late flow enabled + ], +) +async def test_matseeplus_power_signing( + zigpy_device_from_v2_quirk, late_flow_a, late_flow_b +): + """Test power sign application (positive/negative based on energy flow) with and without late flow mitigation.""" + quirked = zigpy_device_from_v2_quirk("_TZE204_81yrt3lo", "TS0601") + ep = quirked.endpoints[1] + + # Set mitigation settings + local_config = ep.local_config + await local_config.write_attributes( + {"late_energy_flow_a": late_flow_a, "late_energy_flow_b": late_flow_b} + ) + + tuya_manufacturer = ep.tuya_manufacturer + ep1_electrical = quirked.endpoints[1].electrical_measurement + ep2_electrical = quirked.endpoints[2].electrical_measurement + ep3_electrical = quirked.endpoints[3].electrical_measurement + + def send_dp_message(msg): + """Send and verify a DP message.""" + hdr, data = tuya_manufacturer.deserialize(msg) + status = tuya_manufacturer.handle_get_data(data.data) + assert status == foundation.Status.SUCCESS + + # Test mid-sequence startup: Simulate ZHA starting when device is mid-sequence + # Receive power_b first (before interval is initialized) + send_dp_message( + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x02\x58" + ) # DP 105: power_b = 600 + + # Power B should not be reported yet (interval not initialized) + assert ep2_electrical.get("active_power") is None + assert ep3_electrical.get("active_power") is None + + # Test with correct device sequence over two intervals + # Interval 1: Establish baseline power values (stored but not reported with mitigation) + send_dp_message( + b"\x09\x11\x02\x00\x87\x66\x04\x00\x01\x00" + ) # DP 102: energy_flow_a = 0 (Forward, for previous/initial) + + send_dp_message( + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x03\x20" + ) # DP 101: power_a = 800 (for current interval) + + # With mitigation enabled, power_a is stored but not reported yet (waits for next interval's flow) + # Without mitigation, power_a uses current flow and reports immediately + if late_flow_a: + assert ep1_electrical.get("active_power") is None + else: + assert ep1_electrical.get("active_power") == 800 + + send_dp_message( + b"\x09\x0a\x02\x00\x80\x68\x04\x00\x01\x01" + ) # DP 104: energy_flow_b = 1 (Reverse, for previous/initial) + + # With mitigation enabled, receiving energy_flow_b triggers reporting of stored power_b (from mid-sequence startup) + # Without mitigation, energy_flow_b doesn't trigger power_b reporting + if late_flow_b: + assert ep2_electrical.get("active_power") == -600 + else: + assert ep2_electrical.get("active_power") is None + + send_dp_message( + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x02\x58" + ) # DP 105: power_b = 600 (for current interval) + + # Check power B after receiving second power_b + # With mitigation: power is stored for next interval (still showing -600 from energy_flow_b processing) + # Without mitigation: power is computed with current flow and reported immediately as -600 + assert ep2_electrical.get("active_power") == -600 + + # Total calculation + # With late_flow_b=True, receiving energy_flow_b processes the mid-sequence power_b and sets report_interval_b=1 + # So total can be calculated when report_interval_a=1 and report_interval_b=1 + if late_flow_a and late_flow_b: + # Both deferred to next interval: report_interval_a=2, report_interval_b=2 + assert ep3_electrical.get("active_power") is None + elif late_flow_a: + # A deferred (report_interval_a=2), B reported (report_interval_b=1), intervals don't match + assert ep3_electrical.get("active_power") is None + else: + # A reported (report_interval_a=1) + # If late_flow_b=True: B reported via energy_flow_b (report_interval_b=1), total=800+(-600)=200 + # If late_flow_b=False: B reported via second power_b (report_interval_b=1), total=800+(-600)=200 + assert ep3_electrical.get("active_power") == 200 + + # Interval 2: Flow messages process powers from interval 1 (flow sent because interval 2 power ≠ 0) + send_dp_message( + b"\x09\x11\x02\x00\x87\x66\x04\x00\x01\x00" + ) # DP 102: energy_flow_a = 0 (Forward, for interval 1) + + # If mitigation enabled, flow_a now processes stored power_a(800) from interval 1 + # Without mitigation, power was already reported in interval 1 + assert ep1_electrical.get("active_power") == 800 + + send_dp_message( + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x03\x20" + ) # DP 101: power_a = 800 (for interval 2, stored for next interval if mitigation enabled) + + send_dp_message( + b"\x09\x0a\x02\x00\x80\x68\x04\x00\x01\x01" + ) # DP 104: energy_flow_b = 1 (Reverse, for interval 1) + + send_dp_message( + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x02\x58" + ) # DP 105: power_b = 600 (for interval 2, stored for next interval if mitigation enabled) + + # Check final power B and total after interval 2 + # If mitigation enabled, flow_b processes stored power_b(600) with reverse flow = -600 + # Without mitigation, power_b was already reported as -600 + assert ep2_electrical.get("active_power") == -600 + # Both channels now at interval 2, total calculated + assert ep3_electrical.get("active_power") == 200 # 800 + (-600) + + +@pytest.mark.parametrize("late_flow_a,late_flow_b", [(True, True), (False, False)]) +async def test_matseeplus_late_flow_non_power_attribute_delay( + zigpy_device_from_v2_quirk, late_flow_a, late_flow_b +): + """Test that non-power attributes are delayed when late flow mitigation is enabled.""" + quirked = zigpy_device_from_v2_quirk("_TZE204_81yrt3lo", "TS0601") + ep = quirked.endpoints[1] + + # Set mitigation settings + local_config = ep.local_config + await local_config.write_attributes( + {"late_energy_flow_a": late_flow_a, "late_energy_flow_b": late_flow_b} + ) + + tuya_manufacturer = ep.tuya_manufacturer + ep1_electrical = quirked.endpoints[1].electrical_measurement + ep2_electrical = quirked.endpoints[2].electrical_measurement + ep3_electrical = quirked.endpoints[3].electrical_measurement + + def send_dp_message(msg): + """Send and verify a DP message.""" + hdr, data = tuya_manufacturer.deserialize(msg) + status = tuya_manufacturer.handle_get_data(data.data) + assert status == foundation.Status.SUCCESS + + # Test CT A current (endpoint 1) + send_dp_message( + b"\x09\x1f\x02\x00\x04\x71\x02\x00\x04\x00\x00\x03\xe8" + ) # DP 113: rms_current CT A = 1000 + + if late_flow_a: + # Current is held + assert ep1_electrical.get("rms_current") is None + + # Send another current message - releases the previous one + send_dp_message( + b"\x09\x1f\x02\x00\x04\x71\x02\x00\x04\x00\x00\x07\xd0" + ) # DP 113: rms_current CT A = 2000 + + # Previous current (1000) is now available + assert ep1_electrical.get("rms_current") == 1000 + else: + # Without mitigation, current is available immediately + assert ep1_electrical.get("rms_current") == 1000 + + # Test CT B current (endpoint 2) + send_dp_message( + b"\x09\x1f\x02\x00\x04\x72\x02\x00\x04\x00\x00\x0b\xb8" + ) # DP 114: rms_current CT B = 3000 + + if late_flow_b: + # Current is held + assert ep2_electrical.get("rms_current") is None + + # Send another current message - releases the previous one + send_dp_message( + b"\x09\x1f\x02\x00\x04\x72\x02\x00\x04\x00\x00\x0f\xa0" + ) # DP 114: rms_current CT B = 4000 + + # Previous current (3000) is now available + assert ep2_electrical.get("rms_current") == 3000 + else: + # Without mitigation, current is available immediately + assert ep2_electrical.get("rms_current") == 3000 + + # Test voltage (endpoint 3 - total/shared) + send_dp_message( + b"\x09\x1f\x02\x00\x04\x70\x02\x00\x04\x00\x00\x08\xfc" + ) # DP 112: rms_voltage = 2300 + + # Voltage is on endpoint 3 (total) and has no delay - always reported immediately + assert ep3_electrical.get("rms_voltage") == 2300 + + +@pytest.mark.parametrize( + "late_flow_a,late_flow_b", + [ + (False, False), # Both late flow disabled (default behavior) + (True, False), # Only A late flow enabled + (False, True), # Only B late flow enabled + (True, True), # Both late flow enabled + ], +) +async def test_matseeplus_late_flow_zero_power_deferral( + zigpy_device_from_v2_quirk, late_flow_a, late_flow_b +): + """Test zero power deferral when device omits flow DP. + + When power is zero, the device omits the energy flow DP. With late flow mitigation + enabled, this causes the quirk to sign the last received power value and defer the + actual zero to the next interval to maintain consistent timing. + """ + quirked = zigpy_device_from_v2_quirk("_TZE204_81yrt3lo", "TS0601") + ep = quirked.endpoints[1] + + # Set mitigation settings + local_config = ep.local_config + await local_config.write_attributes( + {"late_energy_flow_a": late_flow_a, "late_energy_flow_b": late_flow_b} + ) + + tuya_manufacturer = ep.tuya_manufacturer + ep1_electrical = quirked.endpoints[1].electrical_measurement + ep2_electrical = quirked.endpoints[2].electrical_measurement + ep3_electrical = quirked.endpoints[3].electrical_measurement + + def send_dp_message(msg): + """Send and verify a DP message.""" + hdr, data = tuya_manufacturer.deserialize(msg) + status = tuya_manufacturer.handle_get_data(data.data) + assert status == foundation.Status.SUCCESS + + # Establish baseline: two normal intervals with non-zero power + # Interval 1 + send_dp_message(b"\x09\x11\x02\x00\x87\x66\x04\x00\x01\x00") # flow_a = Forward + send_dp_message( + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x03\x20" + ) # power_a = 800 + send_dp_message(b"\x09\x0a\x02\x00\x80\x68\x04\x00\x01\x01") # flow_b = Reverse + send_dp_message( + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x02\x58" + ) # power_b = 600 + + # Interval 2: Flow messages included because interval 2 has non-zero power + send_dp_message( + b"\x09\x11\x02\x00\x87\x66\x04\x00\x01\x00" + ) # flow_a for interval 1 + send_dp_message( + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x03\x20" + ) # power_a = 800 + send_dp_message( + b"\x09\x0a\x02\x00\x80\x68\x04\x00\x01\x01" + ) # flow_b for interval 1 + send_dp_message( + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x02\x58" + ) # power_b = 600 + + # Verify baseline established + assert ep1_electrical.get("active_power") == 800 + assert ep2_electrical.get("active_power") == -600 + assert ep3_electrical.get("active_power") == 200 # 800 + (-600) + + # Interval 3: Zero power values (flow DPs omitted by device) + send_dp_message( + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x00\x00" + ) # power_a = 0 (no flow_a) + + # Check channel A behavior + if late_flow_a: + # Sign previous power (800), deferred zero to interval 4 + assert ep1_electrical.get("active_power") == 800 + assert ep3_electrical.get("active_power") == 200 # 800 + (-600), unchanged + else: + # Zero reported immediately + assert ep1_electrical.get("active_power") == 0 + assert ( + ep3_electrical.get("active_power") == 200 + ) # B still at interval 2, no update + + send_dp_message( + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x00\x00" + ) # power_b = 0 (no flow_b) + + # Check channel B behavior and total + if late_flow_b: + # Sign previous power (-600), deferred zero to interval 4 + assert ep2_electrical.get("active_power") == -600 + expected_total = 200 if late_flow_a else -600 # 800+(-600) or 0+(-600) + assert ep3_electrical.get("active_power") == expected_total + else: + # Zero reported immediately + assert ep2_electrical.get("active_power") == 0 + expected_total = 800 if late_flow_a else 0 # 800+0 or 0+0 + assert ep3_electrical.get("active_power") == expected_total + + # Interval 4: Non-zero power returns, flow DPs sent for interval 3 + send_dp_message( + b"\x09\x11\x02\x00\x87\x66\x04\x00\x01\x00" + ) # flow_a for interval 3 + assert ep1_electrical.get("active_power") == 0 # Deferred zero now released + + send_dp_message( + b"\x09\x1f\x02\x00\x04\x65\x02\x00\x04\x00\x00\x03\x84" + ) # power_a = 900 + + send_dp_message( + b"\x09\x0a\x02\x00\x80\x68\x04\x00\x01\x00" + ) # flow_b for interval 3 + assert ep2_electrical.get("active_power") == 0 # Deferred zero now released + + send_dp_message( + b"\x09\x1f\x02\x00\x04\x69\x02\x00\x04\x00\x00\x02\x58" + ) # power_b = 600 + + # Verify final state at interval 4 + if late_flow_a and late_flow_b: + # Both deferred to interval 5 + assert ep1_electrical.get("active_power") == 0 + assert ep2_electrical.get("active_power") == 0 + assert ep3_electrical.get("active_power") == 0 # 0 + 0 + elif late_flow_a: + # A deferred, B reported + assert ep1_electrical.get("active_power") == 0 + assert ep2_electrical.get("active_power") == 600 + assert ep3_electrical.get("active_power") == 600 # 0 + 600 + elif late_flow_b: + # A reported, B deferred + assert ep1_electrical.get("active_power") == 900 + assert ep2_electrical.get("active_power") == 0 + assert ep3_electrical.get("active_power") == 900 # 900 + 0 + else: + # Both reported immediately + assert ep1_electrical.get("active_power") == 900 + assert ep2_electrical.get("active_power") == 600 + assert ep3_electrical.get("active_power") == 1500 # 900 + 600 diff --git a/zhaquirks/tuya/ts0601_power_matseeplus.py b/zhaquirks/tuya/ts0601_power_matseeplus.py new file mode 100644 index 0000000000..eb98cc3aac --- /dev/null +++ b/zhaquirks/tuya/ts0601_power_matseeplus.py @@ -0,0 +1,587 @@ +"""Tuya MatSeePlus 2 CT Bidirectional Energy Meter.""" + +from __future__ import annotations + +from typing import Any, Final + +from zigpy.quirks.v2.homeassistant import PERCENTAGE, EntityType, UnitOfTime +import zigpy.types as t +from zigpy.zcl.clusters.homeautomation import MeasurementType +from zigpy.zcl.foundation import BaseAttributeDefs, ZCLAttributeDef + +from zhaquirks import LocalDataCluster +from zhaquirks.tuya import ( + TuyaLocalCluster, + TuyaZBElectricalMeasurement, + TuyaZBMeteringClusterWithUnit, +) +from zhaquirks.tuya.builder import TuyaQuirkBuilder +from zhaquirks.tuya.mcu import TuyaMCUCluster + +ENDPOINT_ID_CT_A = 1 +ENDPOINT_ID_CT_B = 2 +ENDPOINT_ID_TOTAL = 3 + + +class TuyaEnergyFlow(t.enum8): + """Energy flow direction attribute values.""" + + Forward = 0x00 + Reverse = 0x01 + + +class MatSeePlusLocalConfig(LocalDataCluster): + """Cluster for storing local configuration. + + Allows control over the delayed energy flow bug mitigation. + """ + + cluster_id: Final[t.uint16_t] = 0xFC00 + name: Final = "Local Configuration" + ep_attribute: Final = "local_config" + + class AttributeDefs(BaseAttributeDefs): + """Configuration attributes.""" + + late_energy_flow_a = ZCLAttributeDef( + id=0x5010, + type=t.Bool, + access="rw", + is_manufacturer_specific=True, + ) + late_energy_flow_b = ZCLAttributeDef( + id=0x5011, + type=t.Bool, + access="rw", + is_manufacturer_specific=True, + ) + + +class MatSeePlusElectricalMeasurement(TuyaZBElectricalMeasurement, TuyaLocalCluster): + """ElectricalMeasurement cluster for MatSeePlus CT energy meter with flow delay mitigation. + + _TZE204_81yrt3lo (app_version: 74, hw_version: 1 and stack_version: 0) has a bug + where the current energy flow values are incorrectly emitted during the next reporting interval. + This means a change in direction result in incorrect power values. + + The bug has remained unfixed for multiple years, with no provided firmware updates from the manufacturer. + When enabled this mitigation holds non-power attribute values until the subsequent interval's attribute report. + This ensures correct values, but introduces a delay in entity updates. + + This is optional and defaults to off because some use cases only have energy flowing in a single direction. + """ + + # Maps endpoint IDs to their corresponding late energy flow configuration attribute names + _EP_MITIGATION_CONFIG_ATTR: dict[int, str] = { + ENDPOINT_ID_CT_A: MatSeePlusLocalConfig.AttributeDefs.late_energy_flow_a.name, + ENDPOINT_ID_CT_B: MatSeePlusLocalConfig.AttributeDefs.late_energy_flow_b.name, + } + + _CONSTANT_ATTRIBUTES: dict[int, Any] = { + **TuyaZBElectricalMeasurement._CONSTANT_ATTRIBUTES, + TuyaZBElectricalMeasurement.AttributeDefs.ac_frequency_divisor.id: 100, + TuyaZBElectricalMeasurement.AttributeDefs.ac_frequency_multiplier.id: 1, + TuyaZBElectricalMeasurement.AttributeDefs.ac_power_divisor.id: 10, + TuyaZBElectricalMeasurement.AttributeDefs.ac_power_multiplier.id: 1, + TuyaZBElectricalMeasurement.AttributeDefs.ac_voltage_divisor.id: 10, + TuyaZBElectricalMeasurement.AttributeDefs.ac_voltage_multiplier.id: 1, + TuyaZBElectricalMeasurement.AttributeDefs.measurement_type.id: MeasurementType.Active_measurement_AC + | MeasurementType.Phase_A_measurement, + } + + _VALID_ATTRIBUTES: set[int] = { + TuyaZBElectricalMeasurement.AttributeDefs.active_power.id, + TuyaZBElectricalMeasurement.AttributeDefs.power_factor.id, + TuyaZBElectricalMeasurement.AttributeDefs.rms_current.id, + } + + def __init__(self, *args, **kwargs): + """Init.""" + self._held_values: dict[str, Any] = {} + super().__init__(*args, **kwargs) + + @property + def _late_energy_flow(self) -> bool: + """Return the config value for the channel endpoint.""" + config_attr = self._EP_MITIGATION_CONFIG_ATTR.get(self.endpoint.endpoint_id) + if not config_attr: + return False + return bool(self.endpoint.device.endpoints[1].local_config.get(config_attr)) + + def _late_energy_flow_handler(self, attr_name: str, value: Any) -> Any: + """Hold non-power attribute values until the next update is received from the device.""" + if attr_name == TuyaZBElectricalMeasurement.AttributeDefs.active_power.name: + return value + + held_value = self._held_values.pop(attr_name, None) + if not self._late_energy_flow: + return value + + self._held_values[attr_name] = value + return held_value + + def update_attribute(self, attr_name: str, value): + """Update the cluster attribute.""" + value = self._late_energy_flow_handler(attr_name, value) + super().update_attribute(attr_name, value) + + +class MatSeePlusElectricalMeasurementTotal(MatSeePlusElectricalMeasurement): + """ElectricalMeasurement cluster for MatSeePlus CT Energy Meter common measurements and total power.""" + + _VALID_ATTRIBUTES: set[int] = { + TuyaZBElectricalMeasurement.AttributeDefs.active_power.id, + TuyaZBElectricalMeasurement.AttributeDefs.ac_frequency.id, + TuyaZBElectricalMeasurement.AttributeDefs.rms_voltage.id, + } + + +class MatSeePlusMetering(TuyaZBMeteringClusterWithUnit, TuyaLocalCluster): + """Metering cluster for MatSeePlus CT energy meter.""" + + _VALID_ATTRIBUTES: set[int] = { + TuyaZBMeteringClusterWithUnit.AttributeDefs.current_summ_delivered.id, + TuyaZBMeteringClusterWithUnit.AttributeDefs.current_summ_received.id, + } + + +class TuyaMatSeePlusManufCluster(TuyaMCUCluster): + """Handle MatSeePlus power datapoint logic, addressing known firmware issues. + + - Sequence power DP value signing and reporting to Electrical Measurement clusters respective of reporting sequence and config. + - Recalculate the AB total power because the reported value on DP 115 is inaccurate due to the flow delay bug. + """ + + ENERGY_FLOW_A: Final = "energy_flow_a" + ENERGY_FLOW_B: Final = "energy_flow_b" + POWER_A: Final = "power_a" + POWER_B: Final = "power_b" + + def __init__(self, *args, **kwargs): + """Init.""" + self._interval: int | None = None + self._interval_complete: bool = True + self._report_interval_a: int | None = None + self._report_interval_b: int | None = None + self._power_a: int | None = None + self._power_b: int | None = None + self._deferred_power_a: int | None = None + self._deferred_power_b: int | None = None + super().__init__(*args, **kwargs) + + @staticmethod + def _align_value_with_energy_flow( + value: int, direction: TuyaEnergyFlow | None + ) -> int | None: + """Align the input value with specified energy flow direction.""" + if value and value > 0 and direction == TuyaEnergyFlow.Reverse: + value = -value + return value + + def _report_power_value(self, value: int, endpoint_id: int): + """Report the power value to the specified ElectricalMeasurement endpoint cluster.""" + self.endpoint.device.endpoints[ + endpoint_id + ].electrical_measurement.update_attribute( + MatSeePlusElectricalMeasurement.AttributeDefs.active_power.name, + value, + ) + + def _maybe_report_total_power(self): + """Calculate and report total power if both channels are ready.""" + if ( + self._interval is not None + and self._interval == self._report_interval_a == self._report_interval_b + and self._power_a is not None + and self._power_b is not None + ): + self._report_power_value(self._power_a + self._power_b, ENDPOINT_ID_TOTAL) + + def _process_power_and_energy_flow( + self, + attr_name: str, + value: int | TuyaEnergyFlow, + power_attr: str, + energy_flow_attr: str, + late_energy_flow: bool, + report_endpoint_id: int, + current_power: int | None, + deferred_power: int | None, + report_interval: int | None, + ) -> tuple[int | None, int | None, int | None]: + """Process power and energy flow DP updates. + + Computes signed power based on DP reporting order and user configuration. + + If late_energy_flow == True, the flow DP value applies to the previous interval's power + (firmware bug on _TZE204_81yrt3lo), so power is signed when the flow DP arrives. + + Special handling for zero power with late_energy_flow enabled: + The device omits the energy flow DP when power is zero, so when zero power arrives, + we sign the last received power value with the stored flow and defer the actual zero + until the next interval. This prevents the zero from overwriting the stored value + before total power calculation and maintains consistent update timing. + + Returns tuple of (current_power, deferred_power, report_interval). + """ + power = None + + # Compute signed power based on configuration and DP reporting order + if self._interval is None: + pass + elif late_energy_flow: + if deferred_power is not None: + # Release deferred power before processing new values + power = deferred_power + deferred_power = None + + if attr_name == energy_flow_attr: + # Sign previous power using current flow value + power = self._align_value_with_energy_flow(self.get(power_attr), value) + elif attr_name == power_attr and value == 0: + # The flow DP was omitted in this interval, sign previous power using stored energy flow + power = self._align_value_with_energy_flow( + self.get(power_attr), self.get(energy_flow_attr) + ) + # Defer zero power until next interval + deferred_power = 0 + elif attr_name == power_attr: + # Sign power immediately + power = self._align_value_with_energy_flow( + value, self.get(energy_flow_attr) + ) + deferred_power = None + + # Update and report current power if there is a new value, otherwise keep existing value + if power is not None: + current_power = power + report_interval = self._interval + self._report_power_value(power, report_endpoint_id) + + return current_power, deferred_power, report_interval + + def update_attribute(self, attr_name: str, value): + """Handle reports to Electrical Measurement power attributes after aligning with energy flow. + + DP reporting sequence per interval: + 1. ENERGY_FLOW_A (omitted if current interval power is 0, contains value for previous interval on _TZE204_81yrt3lo) + 2. POWER_A (for current interval) + 3. ENERGY_FLOW_B (omitted if current interval power is 0, contains value for previous interval on _TZE204_81yrt3lo) + 4. POWER_B (for current interval) + """ + + if attr_name in (self.ENERGY_FLOW_A, self.POWER_A): + # Increment interval when the first A sequence update is received + if self._interval_complete: + self._interval_complete = False + self._interval = (self._interval or 0) + 1 + + # Process new values for ENERGY_FLOW_A and POWER_A + self._power_a, self._deferred_power_a, self._report_interval_a = ( + self._process_power_and_energy_flow( + attr_name, + value, + power_attr=self.POWER_A, + energy_flow_attr=self.ENERGY_FLOW_A, + late_energy_flow=self.endpoint.local_config.get( + MatSeePlusLocalConfig.AttributeDefs.late_energy_flow_a.name + ), + current_power=self._power_a, + deferred_power=self._deferred_power_a, + report_endpoint_id=ENDPOINT_ID_CT_A, + report_interval=self._report_interval_a, + ) + ) + + elif attr_name in (self.ENERGY_FLOW_B, self.POWER_B): + # Process new values for ENERGY_FLOW_B and POWER_B + self._power_b, self._deferred_power_b, self._report_interval_b = ( + self._process_power_and_energy_flow( + attr_name, + value, + power_attr=self.POWER_B, + energy_flow_attr=self.ENERGY_FLOW_B, + late_energy_flow=self.endpoint.local_config.get( + MatSeePlusLocalConfig.AttributeDefs.late_energy_flow_b.name + ), + current_power=self._power_b, + deferred_power=self._deferred_power_b, + report_endpoint_id=ENDPOINT_ID_CT_B, + report_interval=self._report_interval_b, + ) + ) + + # Attempt to report total power + self._maybe_report_total_power() + + # Mark interval complete after POWER_B is processed + if attr_name == self.POWER_B: + self._interval_complete = True + + super().update_attribute(attr_name, value) + + +( + ### MatSee Plus Tuya PJ-1203A 2 CT Bidirectional Energy Meter + TuyaQuirkBuilder("_TZE204_81yrt3lo", "TS0601") + .also_applies_to("_TZE284_81yrt3lo", "TS0601") + .tuya_enchantment() + .adds_endpoint(ENDPOINT_ID_CT_B) + .adds_endpoint(ENDPOINT_ID_TOTAL) + .adds(MatSeePlusElectricalMeasurement) + .adds(MatSeePlusElectricalMeasurement, endpoint_id=ENDPOINT_ID_CT_B) + .adds(MatSeePlusElectricalMeasurement, endpoint_id=ENDPOINT_ID_TOTAL) + .adds(MatSeePlusMetering) + .adds(MatSeePlusMetering, endpoint_id=ENDPOINT_ID_CT_B) + .adds(MatSeePlusLocalConfig) + # Metering attributes + .tuya_dp( + dp_id=106, + ep_attribute=MatSeePlusMetering.ep_attribute, + attribute_name=MatSeePlusMetering.AttributeDefs.current_summ_delivered.name, + ) + .tuya_dp( + dp_id=108, + ep_attribute=MatSeePlusMetering.ep_attribute, + attribute_name=MatSeePlusMetering.AttributeDefs.current_summ_delivered.name, + endpoint_id=ENDPOINT_ID_CT_B, + ) + .tuya_dp( + dp_id=107, + ep_attribute=MatSeePlusMetering.ep_attribute, + attribute_name=MatSeePlusMetering.AttributeDefs.current_summ_received.name, + ) + .tuya_dp( + dp_id=109, + ep_attribute=MatSeePlusMetering.ep_attribute, + attribute_name=MatSeePlusMetering.AttributeDefs.current_summ_received.name, + endpoint_id=ENDPOINT_ID_CT_B, + ) + # Power attributes handled within manufacturer cluster + .tuya_dp_attribute( + dp_id=101, + attribute_name=TuyaMatSeePlusManufCluster.POWER_A, + type=t.uint32_t_be, + ) + .tuya_dp_attribute( + dp_id=105, + attribute_name=TuyaMatSeePlusManufCluster.POWER_B, + type=t.uint32_t_be, + ) + .tuya_dp_attribute( + dp_id=102, + attribute_name=TuyaMatSeePlusManufCluster.ENERGY_FLOW_A, + type=TuyaEnergyFlow, + ) + .tuya_dp_attribute( + dp_id=104, + attribute_name=TuyaMatSeePlusManufCluster.ENERGY_FLOW_B, + type=TuyaEnergyFlow, + ) + # Electrical measurement attributes + .tuya_dp( + dp_id=110, + ep_attribute=MatSeePlusElectricalMeasurement.ep_attribute, + attribute_name=MatSeePlusElectricalMeasurement.AttributeDefs.power_factor.name, + ) + .tuya_dp( + dp_id=121, + ep_attribute=MatSeePlusElectricalMeasurement.ep_attribute, + attribute_name=MatSeePlusElectricalMeasurement.AttributeDefs.power_factor.name, + endpoint_id=ENDPOINT_ID_CT_B, + ) + .tuya_dp( + dp_id=113, + ep_attribute=MatSeePlusElectricalMeasurement.ep_attribute, + attribute_name=MatSeePlusElectricalMeasurement.AttributeDefs.rms_current.name, + ) + .tuya_dp( + dp_id=114, + ep_attribute=MatSeePlusElectricalMeasurement.ep_attribute, + attribute_name=MatSeePlusElectricalMeasurement.AttributeDefs.rms_current.name, + endpoint_id=ENDPOINT_ID_CT_B, + ) + .tuya_dp( + dp_id=112, + ep_attribute=MatSeePlusElectricalMeasurementTotal.ep_attribute, + attribute_name=MatSeePlusElectricalMeasurementTotal.AttributeDefs.rms_voltage.name, + endpoint_id=ENDPOINT_ID_TOTAL, + ) + .tuya_dp( + dp_id=111, + ep_attribute=MatSeePlusElectricalMeasurementTotal.ep_attribute, + attribute_name=MatSeePlusElectricalMeasurementTotal.AttributeDefs.ac_frequency.name, + endpoint_id=ENDPOINT_ID_TOTAL, + ) + # Local configuration attributes + .switch( + MatSeePlusLocalConfig.AttributeDefs.late_energy_flow_a.name, + MatSeePlusLocalConfig.cluster_id, + entity_type=EntityType.CONFIG, + translation_key="mitigate_flow_a_delay", + fallback_name="Mitigate flow A delay", + initially_disabled=False, + ) + .switch( + MatSeePlusLocalConfig.AttributeDefs.late_energy_flow_b.name, + MatSeePlusLocalConfig.cluster_id, + entity_type=EntityType.CONFIG, + translation_key="mitigate_flow_b_delay", + fallback_name="Mitigate flow B delay", + initially_disabled=False, + ) + # Device configuration attributes + .tuya_number( + dp_id=129, + attribute_name="reporting_interval", + type=t.uint32_t_be, + unit=UnitOfTime.SECONDS, + min_value=5, + max_value=60, + step=1, + translation_key="reporting_interval", + fallback_name="Reporting interval", + entity_type=EntityType.CONFIG, + ) + .tuya_number( + dp_id=122, + attribute_name="ac_frequency_coefficient", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_ac_frequency", + fallback_name="Calibrate AC frequency", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .tuya_number( + dp_id=116, + attribute_name="voltage_coefficient", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_voltage", + fallback_name="Calibrate voltage", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .tuya_number( + dp_id=119, + attribute_name="current_summ_delivered_coefficient_a", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_summ_delivered_a", + fallback_name="Calibrate summation delivered A", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .tuya_number( + dp_id=125, + attribute_name="current_summ_delivered_coefficient_b", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_summ_delivered_b", + fallback_name="Calibrate summation delivered B", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .tuya_number( + dp_id=127, + attribute_name="current_summ_received_coefficient_a", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_summ_received_a", + fallback_name="Calibrate summation received A", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .tuya_number( + dp_id=128, + attribute_name="current_summ_received_coefficient_b", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_summ_received_b", + fallback_name="Calibrate summation received B", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .tuya_number( + dp_id=118, + attribute_name="power_coefficient_a", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_power_a", + fallback_name="Calibrate power A", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .tuya_number( + dp_id=124, + attribute_name="power_coefficient_b", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_power_b", + fallback_name="Calibrate power B", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .tuya_number( + dp_id=117, + attribute_name="current_coefficient_a", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_current_a", + fallback_name="Calibrate current A", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .tuya_number( + dp_id=123, + attribute_name="current_coefficient_b", + type=t.uint32_t_be, + unit=PERCENTAGE, + min_value=0, + max_value=2000, + step=0.1, + multiplier=0.1, + translation_key="calibrate_current_b", + fallback_name="Calibrate current B", + entity_type=EntityType.CONFIG, + initially_disabled=True, + ) + .add_to_registry(replacement_cluster=TuyaMatSeePlusManufCluster) +)