Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bookworm/Pi5 Compatibility: Upgrade to latest boilerplate, port to gpiod #36

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions grow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,52 @@
import threading
import time

import RPi.GPIO as GPIO
import gpiodevice

from . import pwm

PLATFORMS = {
"Raspberry Pi 5": {"piezo": ("PIN33", pwm.OUTL)},
"Raspberry Pi 4": {"piezo": ("GPIO13", pwm.OUTL)},
}


class Piezo():
def __init__(self, gpio_pin=13):
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(gpio_pin, GPIO.OUT, initial=GPIO.LOW)
self.pwm = GPIO.PWM(gpio_pin, 440)
self.pwm.start(0)
def __init__(self, gpio_pin=None):

if gpio_pin is None:
gpio_pin = gpiodevice.get_pins_for_platform(PLATFORMS)[0]
elif isinstance(gpio_pin, str):
gpio_pin = gpiodevice.get_pin(gpio_pin, "piezo", pwm.OUTL)

self.pwm = pwm.PWM(gpio_pin)
self._timeout = None
atexit.register(self._exit)
pwm.PWM.start_thread()
atexit.register(pwm.PWM.stop_thread)

def frequency(self, value):
"""Change the piezo frequency.

Loosely corresponds to musical pitch, if you suspend disbelief.

"""
self.pwm.ChangeFrequency(value)
self.pwm.set_frequency(value)

def start(self, frequency=None):
def start(self, frequency):
"""Start the piezo.

Sets the Duty Cycle to 100%
Sets the Duty Cycle to 50%

"""
if frequency is not None:
self.frequency(frequency)
self.pwm.ChangeDutyCycle(1)
self.pwm.start(frequency=frequency, duty_cycle=0.5)

def stop(self):
"""Stop the piezo.

Sets the Duty Cycle to 0%

"""
self.pwm.ChangeDutyCycle(0)
self.pwm.stop()

def beep(self, frequency=440, timeout=0.1, blocking=True, force=False):
"""Beep the piezo for time seconds.
Expand All @@ -67,6 +75,3 @@ def beep(self, frequency=440, timeout=0.1, blocking=True, force=False):
self.start(frequency=frequency)
self._timeout.start()
return True

def _exit(self):
self.pwm.stop()
40 changes: 23 additions & 17 deletions grow/pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@
import threading
import time

import RPi.GPIO as GPIO
import gpiodevice

PUMP_1_PIN = 17
PUMP_2_PIN = 27
PUMP_3_PIN = 22
from . import pwm

PUMP_1_PIN = "PIN11" # 17
PUMP_2_PIN = "PIN13" # 27
PUMP_3_PIN = "PIN15" # 22
PUMP_PWM_FREQ = 10000
PUMP_MAX_DUTY = 90
PUMP_MAX_DUTY = 0.9

PLATFORMS = {
"Raspberry Pi 5": {"pump1": ("PIN11", pwm.OUTL), "pump2": ("PIN12", pwm.OUTL), "pump3": ("PIN15", pwm.OUTL)},
"Raspberry Pi 4": {"pump1": ("GPIO17", pwm.OUTL), "pump2": ("GPIO27", pwm.OUTL), "pump3": ("GPIO22", pwm.OUTL)},
}


global_lock = threading.Lock()
Expand All @@ -17,6 +24,8 @@
class Pump(object):
"""Grow pump driver."""

PINS = None

def __init__(self, channel=1):
"""Create a new pump.

Expand All @@ -26,21 +35,18 @@ def __init__(self, channel=1):

"""

self._gpio_pin = [PUMP_1_PIN, PUMP_2_PIN, PUMP_3_PIN][channel - 1]
if Pump.PINS is None:
Pump.PINS = gpiodevice.get_pins_for_platform(PLATFORMS)

GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self._gpio_pin, GPIO.OUT, initial=GPIO.LOW)
self._pwm = GPIO.PWM(self._gpio_pin, PUMP_PWM_FREQ)
self._pwm.start(0)
self._gpio_pin = Pump.PINS[channel - 1]

self._timeout = None
self._pwm = pwm.PWM(self._gpio_pin, PUMP_PWM_FREQ)
self._pwm.start(0)

atexit.register(self._stop)
pwm.PWM.start_thread()
atexit.register(pwm.PWM.stop_thread)

def _stop(self):
self._pwm.stop(0)
GPIO.setup(self._gpio_pin, GPIO.IN)
self._timeout = None

def set_speed(self, speed):
"""Set pump speed (PWM duty cycle)."""
Expand All @@ -52,7 +58,7 @@ def set_speed(self, speed):
elif not global_lock.acquire(blocking=False):
return False

self._pwm.ChangeDutyCycle(int(PUMP_MAX_DUTY * speed))
self._pwm.set_duty_cycle(PUMP_MAX_DUTY * speed)
self._speed = speed
return True

Expand Down
106 changes: 106 additions & 0 deletions grow/pwm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import time
from threading import Thread

import gpiod
import gpiodevice
from gpiod.line import Direction, Value

OUTL = gpiod.LineSettings(direction=Direction.OUTPUT, output_value=Value.INACTIVE)


class PWM:
_pwms: list = []
_t_pwm: Thread = None
_pwm_running: bool = False

@staticmethod
def start_thread():
if PWM._t_pwm is None:
PWM._pwm_running = True
PWM._t_pwm = Thread(target=PWM._run)
PWM._t_pwm.start()

@staticmethod
def stop_thread():
if PWM._t_pwm is not None:
PWM._pwm_running = False
PWM._t_pwm.join()
PWM._t_pwm = None

@staticmethod
def _add(pwm):
PWM._pwms.append(pwm)

@staticmethod
def _remove(pwm):
index = PWM._pwms.index(pwm)
del PWM._pwms[index]
if len(PWM._pwms) == 0:
PWM.stop_thread()

@staticmethod
def _run():
while PWM._pwm_running:
PWM.run()

@staticmethod
def run():
for pwm in PWM._pwms:
pwm.next(time.time())

def __init__(self, pin, frequency=0, duty_cycle=0, lines=None, offset=None):
self.duty_cycle = 0
self.frequency = 0
self.duty_period = 0
self.period = 0
self.running = False
self.time_start = None
self.state = Value.ACTIVE

self.set_frequency(frequency)
self.set_duty_cycle(duty_cycle)

if isinstance(pin, tuple):
self.lines, self.offset = pin
else:
self.lines, self.offset = gpiodevice.get_pin(pin, "PWM", OUTL)

PWM._add(self)

def set_frequency(self, frequency):
if frequency == 0:
return
self.frequency = frequency
self.period = 1.0 / frequency
self.duty_period = self.duty_cycle * self.period

def set_duty_cycle(self, duty_cycle):
self.duty_cycle = duty_cycle
self.duty_period = self.duty_cycle * self.period

def start(self, duty_cycle=None, frequency=None, start_time=None):
if duty_cycle is not None:
self.set_duty_cycle(duty_cycle)

if frequency is not None:
self.set_frequency(frequency)

self.time_start = time.time() if start_time is None else start_time

self.running = True

def next(self, t):
if not self.running:
return
d = t - self.time_start
d %= self.period
new_state = Value.ACTIVE if d < self.duty_period else Value.INACTIVE
if new_state != self.state:
self.lines.set_value(self.offset, new_state)
self.state = new_state

def stop(self):
self.running = False

def __del__(self):
PWM._remove(self)
4 changes: 2 additions & 2 deletions install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ DATESTAMP=`date "+%Y-%m-%d-%H-%M-%S"`
CONFIG_BACKUP=false
APT_HAS_UPDATED=false
RESOURCES_TOP_DIR=$HOME/Pimoroni
VENV_BASH_SNIPPET=$RESOURCES_DIR/auto_venv.sh
VENV_BASH_SNIPPET=$RESOURCES_TOP_DIR/auto_venv.sh
VENV_DIR=$HOME/.virtualenvs/pimoroni
WD=`pwd`
USAGE="./install.sh (--unstable)"
Expand Down Expand Up @@ -77,7 +77,7 @@ find_config() {
venv_bash_snippet() {
if [ ! -f $VENV_BASH_SNIPPET ]; then
cat << EOF > $VENV_BASH_SNIPPET
# Add `source $RESOURCES_DIR/auto_venv.sh` to your ~/.bashrc to activate
# Add `source $VENV_BASH_SNIPPET` to your ~/.bashrc to activate
# the Pimoroni virtual environment automagically!
VENV_DIR="$VENV_DIR"
if [ ! -f \$VENV_DIR/bin/activate ]; then
Expand Down
16 changes: 12 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ classifiers = [
"Topic :: System :: Hardware",
]
dependencies = [
"ltr559",
"st7735>=0.0.5",
"gpiodevice",
"gpiod>=2.1.3",
"ltr559>=1.0.0",
"st7735>=1.0.0",
"pyyaml",
"fonts",
"font-roboto"
Expand Down Expand Up @@ -121,5 +123,11 @@ ignore = [

[tool.pimoroni]
apt_packages = []
configtxt = []
commands = []
configtxt = [
"dtoverlay=spi0-cs,cs0_pin=14" # Re-assign CS0 from BCM 8 so that Grow can use it
]
commands = [
"printf \"Setting up i2c and SPI..\\n\"",
"sudo raspi-config nonint do_spi 0",
"sudo raspi-config nonint do_i2c 0"
]
45 changes: 17 additions & 28 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,20 @@ def __init__(self, i2c_bus):
@pytest.fixture(scope='function', autouse=True)
def cleanup():
yield None
try:
del sys.modules['grow']
except KeyError:
pass
try:
del sys.modules['grow.moisture']
except KeyError:
pass
try:
del sys.modules['grow.pump']
except KeyError:
pass
for module in ['grow', 'grow.moisture', 'grow.pump']:
try:
del sys.modules[module]
except KeyError:
continue


@pytest.fixture(scope='function', autouse=False)
def GPIO():
"""Mock RPi.GPIO module."""
GPIO = mock.MagicMock()
# Fudge for Python < 37 (possibly earlier)
sys.modules['RPi'] = mock.Mock()
sys.modules['RPi'].GPIO = GPIO
sys.modules['RPi.GPIO'] = GPIO
yield GPIO
del sys.modules['RPi']
del sys.modules['RPi.GPIO']
"""Mock gpiod module."""
gpiod = mock.MagicMock()
sys.modules['gpiod'] = gpiod
yield gpiod
del sys.modules['gpiod']


@pytest.fixture(scope='function', autouse=False)
Expand All @@ -55,13 +44,13 @@ def spidev():


@pytest.fixture(scope='function', autouse=False)
def smbus():
"""Mock smbus module."""
smbus = mock.MagicMock()
smbus.SMBus = SMBusFakeDevice
sys.modules['smbus'] = smbus
yield smbus
del sys.modules['smbus']
def smbus2():
"""Mock smbus2 module."""
smbus2 = mock.MagicMock()
smbus2.SMBus = SMBusFakeDevice
sys.modules['smbus2'] = smbus2
yield smbus2
del sys.modules['smbus2']


@pytest.fixture(scope='function', autouse=False)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_lock.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time


def test_pumps_actually_stop(GPIO, smbus):
def test_pumps_actually_stop(gpiod, smbus2):
from grow.pump import Pump

ch1 = Pump(channel=1)
Expand All @@ -11,7 +11,7 @@ def test_pumps_actually_stop(GPIO, smbus):
assert ch1.get_speed() == 0


def test_pumps_are_mutually_exclusive(GPIO, smbus):
def test_pumps_are_mutually_exclusive(gpiod, smbus2):
from grow.pump import Pump, global_lock

ch1 = Pump(channel=1)
Expand All @@ -29,7 +29,7 @@ def test_pumps_are_mutually_exclusive(GPIO, smbus):
assert ch3.dose(speed=0.5, blocking=False) is False


def test_pumps_run_sequentially(GPIO, smbus):
def test_pumps_run_sequentially(gpiod, smbus2):
from grow.pump import Pump, global_lock

ch1 = Pump(channel=1)
Expand Down
Loading
Loading