Skip to content

Commit

Permalink
feat: add specific gravity trend sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
sairon committed May 19, 2023
1 parent 98d2971 commit bc626eb
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 32 deletions.
35 changes: 35 additions & 0 deletions src/rapt_ble/custom_state_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import sensor_state_data.enum


class SensorDeviceClass(sensor_state_data.BaseDeviceClass):
# inherited fields

BATTERY = sensor_state_data.DeviceClass.BATTERY

SIGNAL_STRENGTH = sensor_state_data.DeviceClass.SIGNAL_STRENGTH

SPECIFIC_GRAVITY = sensor_state_data.DeviceClass.SPECIFIC_GRAVITY

TEMPERATURE = sensor_state_data.DeviceClass.TEMPERATURE

# library-specific fields

SPECIFIC_GRAVITY_TREND = "specific_gravity_trend"


class Units(sensor_state_data.enum.StrEnum):
# inherited fields

PERCENTAGE = sensor_state_data.Units.PERCENTAGE

SPECIFIC_GRAVITY = sensor_state_data.Units.SPECIFIC_GRAVITY

SIGNAL_STRENGTH_DECIBELS_MILLIWATT = (
sensor_state_data.Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT
)

TEMP_CELSIUS = sensor_state_data.Units.TEMP_CELSIUS

# library-specific fields

SPECIFIC_GRAVITY_PER_DAY = "SG/d"
72 changes: 58 additions & 14 deletions src/rapt_ble/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,27 @@
from bluetooth_data_tools import short_address
from bluetooth_sensor_state_data import BluetoothData
from home_assistant_bluetooth import BluetoothServiceInfo
from sensor_state_data import DeviceClass, SensorLibrary, Units
from sensor_state_data import SensorLibrary

from .custom_state_data import SensorDeviceClass, Units

_LOGGER = logging.getLogger(__name__)


RAPTPillMetrics = namedtuple(
"RAPTPillMetrics", "version, mac, temperature, gravity, x, y, z, battery"
"RAPTPillMetrics",
"version, mac, gravity_velocity_valid, gravity_velocity,"
"temperature, gravity, x, y, z, battery",
)

RAPTPillMetricsV1 = namedtuple(
"RAPTPillMetricsV1", "version, mac, temperature, gravity, x, y, z, battery"
)

RAPTPillMetricsV2 = namedtuple(
"RAPTPillMetricsV2",
"version, gravity_velocity_valid, gravity_velocity,"
"temperature, gravity, x, y, z, battery",
)


Expand Down Expand Up @@ -58,15 +72,41 @@ def _process_metrics(self, data: bytes) -> None:
if len(data) != 23:
raise ValueError("Metrics data must have length 23")

# get "raw" data, drop second part of the prefix ("PT"), start with the version
metrics_raw = RAPTPillMetrics._make(unpack(">B6sHfhhhh", data[2:]))
metrics_version = data[2]

metrics_raw: RAPTPillMetrics

if metrics_version == 1:
# get "raw" data, drop second part of the prefix ("PT"),
# start with the version
metrics_raw_v1 = RAPTPillMetricsV1._make(unpack(">B6sHfhhhh", data[2:]))
metrics_raw = RAPTPillMetrics(
**metrics_raw_v1._asdict(),
gravity_velocity=None,
gravity_velocity_valid=None,
)
else:
if metrics_version != 2:
_LOGGER.warning(
"Unexpected RAPT payload version %d, "
"measurements may be incorrect!",
metrics_version,
)
metrics_raw_v2 = RAPTPillMetricsV2._make(unpack(">Bx?fHfhhhh", data[2:]))
metrics_raw = RAPTPillMetrics(**metrics_raw_v2._asdict(), mac=None)

# convert to actual metrics
metrics = RAPTPillMetrics(
version=metrics_raw.version,
mac=hexlify(metrics_raw.mac).decode("ascii")
if metrics_raw.version == 1
else "",
if metrics_version == 1
else None,
gravity_velocity_valid=metrics_raw.gravity_velocity_valid
if metrics_version > 1
else None,
gravity_velocity=metrics_raw.gravity_velocity
if metrics_version > 1
else None,
temperature=round(metrics_raw.temperature / 128 - 273.15, 2),
gravity=round(metrics_raw.gravity / 1000, 4),
x=metrics_raw.x / 16,
Expand All @@ -75,12 +115,6 @@ def _process_metrics(self, data: bytes) -> None:
battery=round(metrics_raw.battery / 256),
)

if metrics.version <= 2:
_LOGGER.warning(
"Unexpected RAPT payload version %d, measurements may be incorrect!",
metrics.version,
)

_LOGGER.debug("Parsed RAPT Pill data: %s", metrics)

self.update_predefined_sensor(
Expand All @@ -90,12 +124,22 @@ def _process_metrics(self, data: bytes) -> None:
SensorLibrary.TEMPERATURE__CELSIUS, metrics.temperature
)
self.update_sensor(
key=DeviceClass.SPECIFIC_GRAVITY,
device_class=DeviceClass.SPECIFIC_GRAVITY,
key=SensorDeviceClass.SPECIFIC_GRAVITY,
device_class=SensorDeviceClass.SPECIFIC_GRAVITY,
native_unit_of_measurement=Units.SPECIFIC_GRAVITY,
native_value=metrics.gravity,
)

if metrics_version >= 2:
self.update_sensor(
key=SensorDeviceClass.SPECIFIC_GRAVITY_TREND,
device_class=SensorDeviceClass.SPECIFIC_GRAVITY_TREND,
native_unit_of_measurement=Units.SPECIFIC_GRAVITY_PER_DAY,
native_value=metrics.gravity_velocity
if metrics.gravity_velocity_valid
else None,
)

def _process_version(self, data: bytes) -> None:
"""Process advertisement with SW version."""
# version payload is e.g. "KEG20220612_050156_81c6d1"
Expand Down
176 changes: 158 additions & 18 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import pytest
from home_assistant_bluetooth import BluetoothServiceInfo
from sensor_state_data import (
DeviceClass,
DeviceKey,
SensorDescription,
SensorDeviceInfo,
SensorUpdate,
SensorValue,
Units,
)

from rapt_ble.custom_state_data import SensorDeviceClass, Units
from rapt_ble.parser import RAPTPillBluetoothDeviceData


Expand Down Expand Up @@ -46,6 +45,8 @@ def bytes_to_service_info(payload: bytes) -> BluetoothServiceInfo:
b"RAPT\x02\x00\x00\x00\x00\x00\x00\x94\x8bD|\xb9\xf64E\x02b&w*\xac",
# payload v2 - valid gravity velocity
b"RAPT\x02\x00\x01\x3e\x9d\xd1\xab\x94\x8bD|\xb9\xf64E\x02b&w*\xac",
b"RAPTdPillG1",
b"KEG20220612_050156_81c6d1",
],
)
def test_device_supported(data_bytes):
Expand All @@ -63,20 +64,149 @@ def test_parse_version():
assert device._get_device_info(None).sw_version == "20220612_050156_81c6d1"


@pytest.mark.parametrize(
"data_bytes",
[
# payload v1
b"RAPT\x01x\xe3m<\xb9\x94\x94\x8bD|\xb9\xf64E\x02b&w*\xac",
# payload v2 - invalid gravity velocity
b"RAPT\x02\x00\x00\x00\x00\x00\x00\x94\x8bD|\xb9\xf64E\x02b&w*\xac",
# payload v2 - valid gravity velocity
def test_parse_metrics_v1():
device = RAPTPillBluetoothDeviceData()
data = bytes_to_service_info(
b"RAPT\x01x\xe3m<\xb9\x94\x94\x8bD|\xb9\xf64E\x02b&w*\xac"
)
result = device.update(data)
assert result == SensorUpdate(
title="RAPT Pill 4455",
devices={
None: SensorDeviceInfo(
name="RAPT Pill 4455",
manufacturer="RAPT",
model="RAPT Pill hydrometer",
hw_version=None,
sw_version=None,
),
},
entity_descriptions={
DeviceKey(key="specific_gravity", device_id=None): SensorDescription(
device_key=DeviceKey(key="specific_gravity", device_id=None),
device_class=SensorDeviceClass.SPECIFIC_GRAVITY,
native_unit_of_measurement=Units.SPECIFIC_GRAVITY,
),
DeviceKey(key="temperature", device_id=None): SensorDescription(
device_key=DeviceKey(key="temperature", device_id=None),
device_class=SensorDeviceClass.TEMPERATURE,
native_unit_of_measurement=Units.TEMP_CELSIUS,
),
DeviceKey(key="battery", device_id=None): SensorDescription(
device_key=DeviceKey(key="battery", device_id=None),
device_class=SensorDeviceClass.BATTERY,
native_unit_of_measurement=Units.PERCENTAGE,
),
DeviceKey(key="signal_strength", device_id=None): SensorDescription(
device_key=DeviceKey(key="signal_strength", device_id=None),
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
),
},
entity_values={
DeviceKey(key="specific_gravity", device_id=None): SensorValue(
device_key=DeviceKey("specific_gravity", device_id=None),
name="Specific Gravity",
native_value=1.0109,
),
DeviceKey(key="temperature", device_id=None): SensorValue(
device_key=DeviceKey(key="temperature", device_id=None),
name="Temperature",
native_value=23.94,
),
DeviceKey(key="battery", device_id=None): SensorValue(
device_key=DeviceKey(key="battery", device_id=None),
name="Battery",
native_value=43,
),
DeviceKey(key="signal_strength", device_id=None): SensorValue(
device_key=DeviceKey(key="signal_strength", device_id=None),
name="Signal Strength",
native_value=-60,
),
},
)


def test_parse_metrics_v2():
device = RAPTPillBluetoothDeviceData()
data = bytes_to_service_info(
b"RAPT\x02\x00\x01\x3e\x9d\xd1\xab\x94\x8bD|\xb9\xf64E\x02b&w*\xac",
],
)
def test_parse_metrics(data_bytes):
)
result = device.update(data)
assert result == SensorUpdate(
title="RAPT Pill 4455",
devices={
None: SensorDeviceInfo(
name="RAPT Pill 4455",
manufacturer="RAPT",
model="RAPT Pill hydrometer",
hw_version=None,
sw_version=None,
),
},
entity_descriptions={
DeviceKey(key="specific_gravity", device_id=None): SensorDescription(
device_key=DeviceKey(key="specific_gravity", device_id=None),
device_class=SensorDeviceClass.SPECIFIC_GRAVITY,
native_unit_of_measurement=Units.SPECIFIC_GRAVITY,
),
DeviceKey(key="temperature", device_id=None): SensorDescription(
device_key=DeviceKey(key="temperature", device_id=None),
device_class=SensorDeviceClass.TEMPERATURE,
native_unit_of_measurement=Units.TEMP_CELSIUS,
),
DeviceKey(key="battery", device_id=None): SensorDescription(
device_key=DeviceKey(key="battery", device_id=None),
device_class=SensorDeviceClass.BATTERY,
native_unit_of_measurement=Units.PERCENTAGE,
),
DeviceKey(key="signal_strength", device_id=None): SensorDescription(
device_key=DeviceKey(key="signal_strength", device_id=None),
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
),
DeviceKey(key="specific_gravity_trend", device_id=None): SensorDescription(
device_key=DeviceKey(key="specific_gravity_trend", device_id=None),
device_class=SensorDeviceClass.SPECIFIC_GRAVITY_TREND,
native_unit_of_measurement=Units.SPECIFIC_GRAVITY_PER_DAY,
),
},
entity_values={
DeviceKey(key="specific_gravity", device_id=None): SensorValue(
device_key=DeviceKey("specific_gravity", device_id=None),
name="Specific Gravity",
native_value=1.0109,
),
DeviceKey(key="temperature", device_id=None): SensorValue(
device_key=DeviceKey(key="temperature", device_id=None),
name="Temperature",
native_value=23.94,
),
DeviceKey(key="battery", device_id=None): SensorValue(
device_key=DeviceKey(key="battery", device_id=None),
name="Battery",
native_value=43,
),
DeviceKey(key="signal_strength", device_id=None): SensorValue(
device_key=DeviceKey(key="signal_strength", device_id=None),
name="Signal Strength",
native_value=-60,
),
DeviceKey(key="specific_gravity_trend", device_id=None): SensorValue(
device_key=DeviceKey("specific_gravity_trend", device_id=None),
name="Specific Gravity Trend",
native_value=0.30824026465415955,
),
},
)


def test_parse_metrics_v2_no_velocity():
device = RAPTPillBluetoothDeviceData()
data = bytes_to_service_info(data_bytes)
data = bytes_to_service_info(
b"RAPT\x02\x00\x00\x00\x00\x00\x00\x94\x8bD|\xb9\xf64E\x02b&w*\xac",
)
result = device.update(data)
assert result == SensorUpdate(
title="RAPT Pill 4455",
Expand All @@ -92,24 +222,29 @@ def test_parse_metrics(data_bytes):
entity_descriptions={
DeviceKey(key="specific_gravity", device_id=None): SensorDescription(
device_key=DeviceKey(key="specific_gravity", device_id=None),
device_class=DeviceClass.SPECIFIC_GRAVITY,
device_class=SensorDeviceClass.SPECIFIC_GRAVITY,
native_unit_of_measurement=Units.SPECIFIC_GRAVITY,
),
DeviceKey(key="temperature", device_id=None): SensorDescription(
device_key=DeviceKey(key="temperature", device_id=None),
device_class=DeviceClass.TEMPERATURE,
device_class=SensorDeviceClass.TEMPERATURE,
native_unit_of_measurement=Units.TEMP_CELSIUS,
),
DeviceKey(key="battery", device_id=None): SensorDescription(
device_key=DeviceKey(key="battery", device_id=None),
device_class=DeviceClass.BATTERY,
device_class=SensorDeviceClass.BATTERY,
native_unit_of_measurement=Units.PERCENTAGE,
),
DeviceKey(key="signal_strength", device_id=None): SensorDescription(
device_key=DeviceKey(key="signal_strength", device_id=None),
device_class=DeviceClass.SIGNAL_STRENGTH,
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
),
DeviceKey(key="specific_gravity_trend", device_id=None): SensorDescription(
device_key=DeviceKey(key="specific_gravity_trend", device_id=None),
device_class=SensorDeviceClass.SPECIFIC_GRAVITY_TREND,
native_unit_of_measurement=Units.SPECIFIC_GRAVITY_PER_DAY,
),
},
entity_values={
DeviceKey(key="specific_gravity", device_id=None): SensorValue(
Expand All @@ -132,5 +267,10 @@ def test_parse_metrics(data_bytes):
name="Signal Strength",
native_value=-60,
),
DeviceKey(key="specific_gravity_trend", device_id=None): SensorValue(
device_key=DeviceKey("specific_gravity_trend", device_id=None),
name="Specific Gravity Trend",
native_value=None,
),
},
)

0 comments on commit bc626eb

Please sign in to comment.