Skip to content
This repository has been archived by the owner on Oct 12, 2022. It is now read-only.

Commit

Permalink
Add refactor for talk prep
Browse files Browse the repository at this point in the history
* Bugfix: math is hard
  • Loading branch information
myoung34 committed Mar 5, 2020
1 parent 7164200 commit 01f0ecb
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 81 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "Tilty"
version = "0.3.2"
version = "0.3.3"
description = "A pluggable system to receive and transmit bluetooth events from the Tilt Hydrometer"
authors = ["Marcus Young <[email protected]>"]
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
long_description=long_description,
long_description_content_type="text/markdown",
py_modules=['tilty', 'blescan'],
version='0.3.2',
version='0.3.3',
packages=find_packages(exclude=['tests*']),
install_requires=[
'Click',
Expand Down
7 changes: 7 additions & 0 deletions tests/test_blescan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@

def test_number_packet():
assert blescan.number_packet(b'\x89=') == 35133
assert blescan.number_packet(b'\x98\xc7') == 39111


def test_string_packet():
assert blescan.string_packet(b'\xfe\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') == 'fe000000000000000000000000000000' # noqa
assert blescan.string_packet(b'\x93\xe2\xfdD\x1b\xafhOH\xef>mn\x91\xcb\x14') == '93e2fd441baf684f48ef3e6d6e91cb14' # noqa


def test_packed_bdaddr_to_string():
assert blescan.packed_bdaddr_to_string(b'ID\x8b\xea&b') == '62:26:ea:8b:44:49' # noqa


def test_parse_packet():
assert blescan.parse_packet(b'\x04>+\x02\x01\x03\x01r\xed\x08S\x84=\x1f\x1e\xff\x06\x00\x01\t \x02)\xa7\x93\xe2\xfdD\x1b\xafhOH\xef>mn\x91\xcb\x14\x02$\x98\xc7\xef\xb3') == {'mac': 'ed:72:01:03:01:02', 'uuid': '93e2fd441baf684f48ef3e6d6e91cb14', 'major': 548, 'minor': 39111} # noqa
assert blescan.parse_packet(b'\x04>*\x02\x01\x03\x01w\t\xbc\xd0W\xef\x1e\x02\x01\x04\x1a\xffL\x00\x02\x15\xa4\x95\xbb0\xc5\xb1KD\xb5\x12\x13p\xf0-t\xde\x00B\x03\xf7\xc5\xa7') == {'mac': '09:77:01:03:01:02', 'uuid': 'a495bb30c5b14b44b5121370f02d74de', 'major': 66, 'minor': 1015} # noqa
10 changes: 5 additions & 5 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_cli_invalid_params():
assert result.output == 'Usage: run [OPTIONS]\nTry "run --help" for help.\n\nError: no such option: --foo\n' # noqa


@mock.patch('tilty.blescan.parse_events', return_value=[{'uuid': 'foo', 'major': 78, 'minor': 1833}]) # noqa
@mock.patch('tilty.blescan.get_events', return_value=[{'uuid': 'foo', 'major': 78, 'minor': 1833}]) # noqa
@mock.patch('tilty.blescan.hci_le_set_scan_parameters') # noqa
@mock.patch('tilty.blescan.hci_enable_le_scan') # noqa
def test_cli_no_params_no_valid_data(
Expand All @@ -37,10 +37,10 @@ def test_cli_no_params_no_valid_data(
runner = CliRunner()
result = runner.invoke(cli.run, [])
assert result.exit_code == 0
assert result.output == 'Scanning for Tilt data...\n\n' # noqa
assert result.output == 'Scanning for Tilt data...\n' # noqa


@mock.patch('tilty.blescan.parse_events', return_value=[]) # noqa
@mock.patch('tilty.blescan.get_events', return_value=[]) # noqa
@mock.patch('tilty.blescan.hci_le_set_scan_parameters') # noqa
@mock.patch('tilty.blescan.hci_enable_le_scan') # noqa
def test_cli_no_params_no_data(
Expand All @@ -51,9 +51,9 @@ def test_cli_no_params_no_data(
runner = CliRunner()
result = runner.invoke(cli.run, [])
assert result.exit_code == 0
assert result.output == 'Scanning for Tilt data...\n\n' # noqa
assert result.output == 'Scanning for Tilt data...\n' # noqa

@mock.patch('tilty.blescan.parse_events', return_value=[{'uuid': 'a495bb30c5b14b44b5121370f02d74de', 'major': 60, 'minor': 1053}]) # noqa
@mock.patch('tilty.blescan.get_events', return_value=[{'uuid': 'a495bb30c5b14b44b5121370f02d74de', 'major': 60, 'minor': 1053}]) # noqa
@mock.patch('tilty.blescan.hci_le_set_scan_parameters') # noqa
@mock.patch('tilty.blescan.hci_enable_le_scan') # noqa
def test_cli_no_params_success(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_tilty.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from tilty import tilt_device, tilty


@mock.patch('tilty.blescan.parse_events', return_value=[{'uuid': 'foo', 'major': 2, 'minor': 1}]) # noqa
@mock.patch('tilty.blescan.get_events', return_value=[{'uuid': 'foo', 'major': 2, 'minor': 1}]) # noqa
def test_scan_for_tilt_data(
bt_events,
):
Expand Down
122 changes: 52 additions & 70 deletions tilty/blescan.py
Original file line number Diff line number Diff line change
@@ -1,111 +1,93 @@
# -*- coding: utf-8 -*-
# flake8: noqa
# pylint: skip-file
# BLE iBeaconScanner based on https://github.com/adamf/BLE/blob/master/ble-scanner.py
# JCS 06/07/14
# Adapted for Python3 by Michael duPont 2015-04-05

# BLE scanner based on https://github.com/adamf/BLE/blob/master/ble-scanner.py
# BLE scanner, based on https://code.google.com/p/pybluez/source/browse/trunk/examples/advanced/inquiry-with-rssi.py

# https://github.com/pauloborges/bluez/blob/master/tools/hcitool.c for lescan
# https://kernel.googlesource.com/pub/scm/bluetooth/bluez/+/5.6/lib/hci.h for opcodes
# https://github.com/pauloborges/bluez/blob/master/lib/hci.c#L2782 for functions used by lescan

# performs a simple device inquiry, and returns a list of ble advertizements
# discovered device

# NOTE: Python's struct.pack() will add padding bytes unless you make the endianness explicit. Little endian
# should be used for BLE. Always start a struct.pack() format string with "<"

#Installation
#sudo apt-get install libbluetooth-dev bluez
#sudo pip-3.2 install pybluez #pip-3.2 for Python3.2 on Raspberry Pi

import os
# pylint: disable=line-too-long,missing-function-docstring
""" This Module parses iBeacon events for the tilt hydrometer """
import struct
import sys

import bluetooth._bluetooth as bluez

LE_META_EVENT = 0x3e
OGF_LE_CTL=0x08
OCF_LE_SET_SCAN_ENABLE=0x000C

# these are actually subevents of LE_META_EVENT
EVT_LE_CONN_COMPLETE=0x01
EVT_LE_ADVERTISING_REPORT=0x02
def get_socket(device_id):
return bluez.hci_open_dev(device_id)

def getBLESocket(devID):
return bluez.hci_open_dev(devID)

def number_packet(pkt):
# b'\x89='
myInteger = 0
# for(each byte) -> (start at 256 offset) -> byte as int * index -> add up
# b'\x89=' -> loop -> 137*256 -> + -> 61 -> 35133
_int = 0
multiple = 256
for i in range(len(pkt)):
# 35072
# 61
myInteger += struct.unpack("B",pkt[i:i+1])[0] * multiple
_int += struct.unpack("B", pkt[i:i+1])[0] * multiple
multiple = 1
return myInteger # 35133
return _int # 35133


def string_packet(pkt):
# UUID is 16 Bytes
# b'\xfe\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
# so len() is 16
# loop over each byte, get it to hex, build up the string (uuid is 32 chars, 16bytes)
myString = "";
# loop over each byte, get it to hex, build up the string (uuid is 32 chars, 16bytes) # noqa
_str = ""
for i in range(len(pkt)): # 0-16 loop
myString += "%02x" %struct.unpack("B",pkt[i:i+1])[0]
return myString
_str += "%02x" % struct.unpack("B", pkt[i:i+1])[0]
return _str


def packed_bdaddr_to_string(bdaddr_packed):
# iBeacon packets have the mac byte-reversed, reverse with bdaddr_packed[::-1]
# iBeacon packets have the mac byte-reversed, reverse with bdaddr_packed[::-1] # noqa
# b'ID\x8b\xea&b' -> b'b&\xea\x8bDI'
# decode to int -> (98, 38, 234, 139, 68, 73) , join by : as hex -> '62:26:ea:8b:44:49'
return ':'.join('%02x'%i for i in struct.unpack("<BBBBBB", bdaddr_packed[::-1]))
# decode to int -> (98, 38, 234, 139, 68, 73) , join by : as hex -> '62:26:ea:8b:44:49' # noqa
return ':'.join('%02x' % i for i in struct.unpack("<BBBBBB", bdaddr_packed[::-1])) # noqa


def hci_enable_le_scan(sock):
hci_toggle_le_scan(sock, 0x01)


def hci_disable_le_scan(sock):
hci_toggle_le_scan(sock, 0x00)


def hci_toggle_le_scan(sock, enable):
cmd_pkt = struct.pack("<BB", enable, 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)
bluez.hci_send_cmd(sock, 0x08, 0x000C, cmd_pkt)


def hci_le_set_scan_parameters(sock):
old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
sock.getsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, 14)

def parse_events(sock, loop_count=100):
old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)

def get_events(sock, loop_count=100):
old_filter = sock.getsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, 14)
flt = bluez.hci_filter_new()
bluez.hci_filter_all_events(flt)
bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
sock.setsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, flt)
beacons = []
for i in range(0, loop_count):
for i in range(0, loop_count): # pylint: disable=unused-variable
pkt = sock.recv(255)
# http://www.havlena.net/wp-content/themes/striking/includes/timthumb.php?src=/wp-content/uploads/ibeacon-packet.png&w=600&zc=1
#pkt = b'\x04>+\x02\x01\x03\x01r\xed\x08S\x84=\x1f\x1e\xff\x06\x00\x01\t \x02)\xa7\x93\xe2\xfdD\x1b\xafhOH\xef>mn\x91\xcb\x14\x02$\x98\xc7\xef\xb3'
# | | | | | | | | | | |
# | event|sub|#rep\ | | mac addr | | uuid | major\ minor| |
ptype, event, plen = struct.unpack("BBB", pkt[:3]) # b'\x04>(' -> (4, 62, 40)
if event == LE_META_EVENT: # 62 -> 0x3e -> HCI Event: LE Meta Event (0x3e) plen 39
subevent, = struct.unpack("B", pkt[3:4]) # b'\x02' -> (2,)
# chop off \x04>+\x02 (the event + subevent)
pkt = pkt[4:] # b'\x01\x03\x01r\xed\x08S\x84=\x1f\x1e\xff\x06\x00\x01\t \x02)\xa7\x93\xe2\xfdD\x1b\xafhOH\xef>mn\x91\xcb\x14\x02$\x98\xc7\xef\xb3'
if subevent == EVT_LE_ADVERTISING_REPORT: # if 0x02 (2) -> all iBeacons use this
num_reports = struct.unpack("B", pkt[0:1])[0] # b'\x01' -> (1,) -> number of reports to receive
for i in range(0, num_reports):
# build the return string
beacons.append({
'mac': packed_bdaddr_to_string(pkt[3:9]), # b'r\xed\x08S\x84='
'uuid': string_packet(pkt[22:6]), # b'\x93\xe2\xfdD\x1b\xafhOH\xef>mn\x91\xcb\x14'
'minor': number_packet(pkt[4:2]), # b'\x98\xc7'
'major': number_packet(pkt[6:4]), # b'\x02$'
})
sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, old_filter )
beacons.append(parse_packet(pkt))
sock.setsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, old_filter)
return beacons


def parse_packet(pkt):
# http://www.havlena.net/wp-content/themes/striking/includes/timthumb.php?src=/wp-content/uploads/ibeacon-packet.png&w=600&zc=1
#pkt = b' \x04>* \x02\x01x03\x01w\t \xbc\xd0W\xef\x1e\x02\x01\x04\x1a\xffL\x00\x02\x15 \xa4\x95\xbb0\xc5\xb1KD\xb5\x12\x13p\xf0-t\xde \x00B \x03\xf7 \xc5\xa7' # noqa
# | | | | | | | | | # noqa
# | preamble+header | PDU | # noqa
# | 3 bytes | x bytes (plen) | # noqa
# | | | mac addr | uuid | unused data | major| minor | tx | # noqa
# | | | | | | temp | gravity | | # noqa
ptype, event, plen = struct.unpack("BBB", pkt[:3]) # b'\x04>+' -> (4, 62, 40) # pylint:disable=unused-variable # noqa
if event == 0x3e: # 62 -> 0x3e -> HCI Event: LE Meta Event (0x3e) plen 44
subevent, = struct.unpack("B", pkt[3:4]) # b'\x02' -> (2,)
if subevent == 0x02: # if 0x02 (2) -> all iBeacons use this
return {
'mac': packed_bdaddr_to_string(pkt[3:9]), # mac -> 6 bytes -> b'\x02\x01\x03\x01w\t' # noqa
'uuid': string_packet(pkt[-22:-6]), # uuid -> 16bytes -> b'\xa4\x95\xbb0\xc5\xb1KD\xb5\x12\x13p\xf0-t\xde' # noqa
'major': number_packet(pkt[-6:-4]), # major -> 2 bytes -> b'\x00B' # noqa
'minor': number_packet(pkt[-4:-2]), # minor -> 2 bytes -> b'\x03\xf7' # noqa
}
return {}
5 changes: 3 additions & 2 deletions tilty/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ def scan_and_emit(device, config):
""" method that does the needful
"""
tilt_data = device.scan_for_tilt_data()
click.echo(tilt_data)
emit(config=config, tilt_data=tilt_data)
if tilt_data:
click.echo(tilt_data)
emit(config=config, tilt_data=tilt_data)


def scan_and_emit_thread(device, config, keep_running=False):
Expand Down
2 changes: 1 addition & 1 deletion tilty/tilt_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def scan_for_tilt_data(self):
""" scan for tilt and return data if found """

data = None
for beacon in blescan.parse_events(self.sock, 10):
for beacon in blescan.get_events(self.sock):
if beacon['uuid'] in constants.TILT_DEVICES:
data = {
'color': constants.TILT_DEVICES[beacon['uuid']],
Expand Down

0 comments on commit 01f0ecb

Please sign in to comment.