Skip to content

First version of iterativerobotpy.py #161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion subprojects/robotpy-wpilib/wpilib/__init__.py
Original file line number Diff line number Diff line change
@@ -233,6 +233,8 @@

from .cameraserver import CameraServer
from .deployinfo import getDeployData
from .iterativerobotpy import IterativeRobotPy
from .timedrobotpy import TimedRobotPy

try:
from .version import version as __version__
@@ -241,4 +243,4 @@

from ._impl.main import run

__all__ += ["CameraServer", "run"]
__all__ += ["CameraServer", "run", "IterativeRobotPy", "TimedRobotPy"]
459 changes: 459 additions & 0 deletions subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,459 @@
from enum import Enum

from hal import (
report,
tResourceType,
tInstances,
observeUserProgramDisabled,
observeUserProgramTest,
observeUserProgramAutonomous,
observeUserProgramTeleop,
simPeriodicBefore,
simPeriodicAfter,
)
from ntcore import NetworkTableInstance
from wpilib import (
DriverStation,
DSControlWord,
Watchdog,
LiveWindow,
RobotBase,
SmartDashboard,
reportWarning,
)
from wpilib.shuffleboard import Shuffleboard
import wpimath.units

_kResourceType_SmartDashboard = tResourceType.kResourceType_SmartDashboard
_kSmartDashboard_LiveWindow = tInstances.kSmartDashboard_LiveWindow


class IterativeRobotMode(Enum):
kNone = 0
kDisabled = 1
kAutonomous = 2
kTeleop = 3
kTest = 4


# todo should this be IterativeRobotPy or IterativeRobotBase (replacing) or IterativeRobotBasePy
class IterativeRobotPy(RobotBase):
"""
IterativeRobotPy implements a specific type of robot program framework,
extending the RobotBase class. It provides similar functionality as
IterativeRobotBase does in a python wrapper of C++, but in pure python.
The IterativeRobotPy class does not implement StartCompetition(), so it
should not be used by teams directly.
This class provides the following functions which are called by the main
loop, StartCompetition(), at the appropriate times:
robotInit() -- provide for initialization at robot power-on
DriverStationConnected() -- provide for initialization the first time the DS
is connected
Init() functions -- each of the following functions is called once when the
appropriate mode is entered:
- disabledInit() -- called each and every time disabled is entered from another mode
- autonomousInit() -- called each and every time autonomous is entered from another mode
- teleopInit() -- called each and every time teleop is entered from another mode
- testInit() -- called each and every time test is entered from another mode
Periodic() functions -- each of these functions is called on an interval:
- robotPeriodic()
- disabledPeriodic()
- autonomousPeriodic()
- teleopPeriodic()
- testPeriodic()
Exit() functions -- each of the following functions is called once when the
appropriate mode is exited:
- disabledExit() -- called each and every time disabled is exited
- autonomousExit() -- called each and every time autonomous is exited
- teleopExit() -- called each and every time teleop is exited
- testExit() -- called each and every time test is exited
"""

def __init__(self, period: wpimath.units.seconds) -> None:
"""
Constructor for IterativeRobotPy.
:param period: period of the main robot periodic loop in seconds.
"""
super().__init__()
self._periodS = period
self.watchdog = Watchdog(self._periodS, self.printLoopOverrunMessage)
self._networkTableInstanceDefault = NetworkTableInstance.getDefault()
self._mode: IterativeRobotMode = IterativeRobotMode.kNone
self._lastMode: IterativeRobotMode = IterativeRobotMode.kNone
self._ntFlushEnabled: bool = True
self._lwEnabledInTest: bool = False
self._calledDsConnected: bool = False
self._reportedLw: bool = False
self._robotPeriodicHasRun: bool = False
self._simulationPeriodicHasRun: bool = False
self._disabledPeriodicHasRun: bool = False
self._autonomousPeriodicHasRun: bool = False
self._teleopPeriodicHasRun: bool = False
self._testPeriodicHasRun: bool = False

def robotInit(self) -> None:
"""
Robot-wide initialization code should go here.
Users should override this method for default Robot-wide initialization
which will be called when the robot is first powered on. It will be called
exactly one time.
Note: This method is functionally identical to the class constructor so
that should be used instead.
"""
pass

def driverStationConnected(self) -> None:
"""
Code that needs to know the DS state should go here.
Users should override this method for initialization that needs to occur
after the DS is connected, such as needing the alliance information.
"""
pass

def simulationInit(self) -> None:
"""
Robot-wide simulation initialization code should go here.
Users should override this method for default Robot-wide simulation
related initialization which will be called when the robot is first
started. It will be called exactly one time after robotInit is called
only when the robot is in simulation.
"""
pass

def disabledInit(self) -> None:
"""
Initialization code for disabled mode should go here.
Users should override this method for initialization code which will be
called each time
the robot enters disabled mode.
"""
pass

def autonomousInit(self) -> None:
"""
Initialization code for autonomous mode should go here.
Users should override this method for initialization code which will be
called each time the robot enters autonomous mode.
"""
pass

def teleopInit(self) -> None:
"""
Initialization code for teleop mode should go here.
Users should override this method for initialization code which will be
called each time the robot enters teleop mode.
"""
pass

def testInit(self) -> None:
"""
Initialization code for test mode should go here.
Users should override this method for initialization code which will be
called each time the robot enters test mode.
"""
pass

def robotPeriodic(self) -> None:
"""
Periodic code for all modes should go here.
This function is called each time a new packet is received from the driver
station.
"""
if not self._robotPeriodicHasRun:
print(f"Default robotPeriodic() method...Override me!")
self._robotPeriodicHasRun = True

# todo why is this _simulationPeriodic in previous code? Can it be simulationPeriodic and still work?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it to _simulationPeriodic to discourage users from using it, and to put simulation stuff in physics.py instead. See also #130

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also the place where the physics.py is hooked in: https://github.com/robotpy/pyfrc/blob/main/pyfrc/physics/core.py#L123

def simulationPeriodic(self) -> None:
"""
Periodic simulation code should go here.
This function is called in a simulated robot after user code executes.
"""
if not self._simulationPeriodicHasRun:
print(f"Default simulationPeriodic() method...Override me!")
self._simulationPeriodicHasRun = True

def disabledPeriodic(self) -> None:
"""
Periodic code for disabled mode should go here.
Users should override this method for code which will be called each time a
new packet is received from the driver station and the robot is in disabled
mode.
"""
if not self._disabledPeriodicHasRun:
print(f"Default disabledPeriodic() method...Override me!")
self._disabledPeriodicHasRun = True

def autonomousPeriodic(self) -> None:
"""
Periodic code for autonomous mode should go here.
Users should override this method for code which will be called each time a
new packet is received from the driver station and the robot is in
autonomous mode.
"""
if not self._autonomousPeriodicHasRun:
print(f"Default autonomousPeriodic() method...Override me!")
self._autonomousPeriodicHasRun = True

def teleopPeriodic(self) -> None:
"""
Periodic code for teleop mode should go here.
Users should override this method for code which will be called each time a
new packet is received from the driver station and the robot is in teleop
mode.
"""
if not self._teleopPeriodicHasRun:
print(f"Default teleopPeriodic() method...Override me!")
self._teleopPeriodicHasRun = True

def testPeriodic(self) -> None:
"""
Periodic code for test mode should go here.
Users should override this method for code which will be called each time a
new packet is received from the driver station and the robot is in test
mode.
"""
if not self._testPeriodicHasRun:
print(f"Default testPeriodic() method...Override me!")
self._testPeriodicHasRun = True

def disabledExit(self) -> None:
"""
Exit code for disabled mode should go here.
Users should override this method for code which will be called each time
the robot exits disabled mode.
"""
pass

def autonomousExit(self) -> None:
"""
Exit code for autonomous mode should go here.
Users should override this method for code which will be called each time
the robot exits autonomous mode.
"""
pass

def teleopExit(self) -> None:
"""
Exit code for teleop mode should go here.
Users should override this method for code which will be called each time
the robot exits teleop mode.
"""
pass

def testExit(self) -> None:
"""
Exit code for test mode should go here.
Users should override this method for code which will be called each time
the robot exits test mode.
"""
pass

def setNetworkTablesFlushEnabled(self, enabled: bool) -> None:
"""
Enables or disables flushing NetworkTables every loop iteration.
By default, this is enabled.
:deprecated: Deprecated without replacement.
:param enabled: True to enable, false to disable.
"""

self._ntFlushEnabled = enabled

def enableLiveWindowInTest(self, testLW: bool) -> None:
"""
Sets whether LiveWindow operation is enabled during test mode.
:param testLW: True to enable, false to disable. Defaults to false.
@throws if called in test mode.
"""
if self.isTestEnabled():
raise RuntimeError("Can't configure test mode while in test mode!")
if not self._reportedLw and testLW:
report(_kResourceType_SmartDashboard, _kSmartDashboard_LiveWindow)
self._reportedLw = True
self._lwEnabledInTest = testLW

def isLiveWindowEnabledInTest(self) -> bool:
"""
Whether LiveWindow operation is enabled during test mode.
"""
return self._lwEnabledInTest

def getPeriod(self) -> wpimath.units.seconds:
"""
Gets time period between calls to Periodic() functions.
"""
return self._periodS

def _loopFunc(self) -> None:
"""
Loop function.
"""
DriverStation.refreshData()
self.watchdog.reset()

isEnabled, isAutonomous, isTest = self.getControlState()
if not isEnabled:
self._mode = IterativeRobotMode.kDisabled
elif isAutonomous:
self._mode = IterativeRobotMode.kAutonomous
elif isTest:
self._mode = IterativeRobotMode.kTest
else:
self._mode = IterativeRobotMode.kTeleop

if not self._calledDsConnected and DSControlWord().isDSAttached():
self._calledDsConnected = True
self.driverStationConnected()

# If self._mode changed, call self._mode exit and entry functions
if self._lastMode is not self._mode:

if self._lastMode is IterativeRobotMode.kDisabled:
self.disabledExit()
elif self._lastMode is IterativeRobotMode.kAutonomous:
self.autonomousExit()
elif self._lastMode is IterativeRobotMode.kTeleop:
self.teleopExit()
elif self._lastMode is IterativeRobotMode.kTest:
if self._lwEnabledInTest:
LiveWindow.setEnabled(False)
Shuffleboard.disableActuatorWidgets()
self.testExit()
"""
todo switch to match statements when we don't build with python 3.9
match self._lastMode:
case IterativeRobotMode.kDisabled:
self.disabledExit()
case IterativeRobotMode.kAutonomous:
self.autonomousExit()
case IterativeRobotMode.kTeleop:
self.teleopExit()
case IterativeRobotMode.kTest:
if self._lwEnabledInTest:
LiveWindow.setEnabled(False)
Shuffleboard.disableActuatorWidgets()
self.testExit()
"""

if self._mode is IterativeRobotMode.kDisabled:
self.disabledInit()
self.watchdog.addEpoch("disabledInit()")
elif self._mode is IterativeRobotMode.kAutonomous:
self.autonomousInit()
self.watchdog.addEpoch("autonomousInit()")
elif self._mode is IterativeRobotMode.kTeleop:
self.teleopInit()
self.watchdog.addEpoch("teleopInit()")
elif self._mode is IterativeRobotMode.kTest:
if self._lwEnabledInTest:
LiveWindow.setEnabled(True)
Shuffleboard.enableActuatorWidgets()
self.testInit()
self.watchdog.addEpoch("testInit()")
"""
match self._mode:
case IterativeRobotMode.kDisabled:
self.disabledInit()
self._watchdog.addEpoch("disabledInit()")
case IterativeRobotMode.kAutonomous:
self.autonomousInit()
self._watchdog.addEpoch("autonomousInit()")
case IterativeRobotMode.kTeleop:
self.teleopInit()
self._watchdog.addEpoch("teleopInit()")
case IterativeRobotMode.kTest:
if self._lwEnabledInTest:
LiveWindow.setEnabled(True)
Shuffleboard.enableActuatorWidgets()
self.testInit()
self._watchdog.addEpoch("testInit()")
"""
self._lastMode = self._mode

# Call the appropriate function depending upon the current robot mode
match self._mode:
case IterativeRobotMode.kDisabled:
observeUserProgramDisabled()
self.disabledPeriodic()
self.watchdog.addEpoch("disabledPeriodic()")
case IterativeRobotMode.kAutonomous:
observeUserProgramAutonomous()
self.autonomousPeriodic()
self.watchdog.addEpoch("autonomousPeriodic()")
case IterativeRobotMode.kTeleop:
observeUserProgramTeleop()
self.teleopPeriodic()
self.watchdog.addEpoch("teleopPeriodic()")
case IterativeRobotMode.kTest:
observeUserProgramTest()
self.testPeriodic()
self.watchdog.addEpoch("testPeriodic()")

self.robotPeriodic()
self.watchdog.addEpoch("robotPeriodic()")

SmartDashboard.updateValues()
self.watchdog.addEpoch("SmartDashboard::UpdateValues()")

LiveWindow.updateValues()
self.watchdog.addEpoch("LiveWindow::UpdateValues()")

Shuffleboard.update()
self.watchdog.addEpoch("Shuffleboard::Update()")

if self.isSimulation():
simPeriodicBefore()
self.simulationPeriodic()
simPeriodicAfter()
self.watchdog.addEpoch("SimulationPeriodic()")

self.watchdog.disable()

# Flush NetworkTables
if self._ntFlushEnabled:
self._networkTableInstanceDefault.flushLocal()

# Warn on loop time overruns
if self.watchdog.isExpired():
self.printWatchdogEpochs()

def printLoopOverrunMessage(self) -> None:
reportWarning(f"Loop time of {self.watchdog.getTimeout()}s overrun", False)

def printWatchdogEpochs(self) -> None:
"""
Prints list of epochs added so far and their times.
"""
self.watchdog.printEpochs()
247 changes: 247 additions & 0 deletions subprojects/robotpy-wpilib/wpilib/timedrobotpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
from typing import Any, Callable, Iterable, ClassVar
from heapq import heappush, heappop
from hal import (
report,
initializeNotifier,
setNotifierName,
observeUserProgramStarting,
updateNotifierAlarm,
waitForNotifierAlarm,
stopNotifier,
tResourceType,
tInstances,
)
from wpilib import RobotController
import wpimath.units

from .iterativerobotpy import IterativeRobotPy

_getFPGATime = RobotController.getFPGATime
_kResourceType_Framework = tResourceType.kResourceType_Framework
_kFramework_Timed = tInstances.kFramework_Timed

microsecondsAsInt = int


class _Callback:
def __init__(
self,
func: Callable[[], None],
periodUs: microsecondsAsInt,
expirationUs: microsecondsAsInt,
) -> None:
self.func = func
self._periodUs = periodUs
self.expirationUs = expirationUs

@classmethod
def makeCallBack(
cls,
func: Callable[[], None],
startTimeUs: microsecondsAsInt,
periodUs: microsecondsAsInt,
offsetUs: microsecondsAsInt,
) -> "_Callback":

callback = _Callback(func=func, periodUs=periodUs, expirationUs=startTimeUs)

currentTimeUs = _getFPGATime()
callback.expirationUs = offsetUs + callback.calcFutureExpirationUs(
currentTimeUs
)
return callback

def calcFutureExpirationUs(
self, currentTimeUs: microsecondsAsInt
) -> microsecondsAsInt:
# increment the expiration time by the number of full periods it's behind
# plus one to avoid rapid repeat fires from a large loop overrun. We assume
# currentTime ≥ startTimeUs rather than checking for it since the
# callback wouldn't be running otherwise.
# todo does this math work?
# todo does the "// periodUs * periodUs" do the correct integer math?
return (
self.expirationUs
+ self._periodUs
+ ((currentTimeUs - self.expirationUs) // self._periodUs) * self._periodUs
)

def setNextStartTimeUs(self, currentTimeUs: microsecondsAsInt) -> None:
self.expirationUs = self.calcFutureExpirationUs(currentTimeUs)

def __lt__(self, other) -> bool:
return self.expirationUs < other.expirationUs

def __bool__(self) -> bool:
return True


class _OrderedList:
def __init__(self) -> None:
self._data: list[Any] = []

def add(self, item: Any) -> None:
heappush(self._data, item)

def pop(self) -> Any:
return heappop(self._data)

def peek(self) -> Any | None:
if self._data:
return self._data[0]
else:
return None

def __len__(self) -> int:
return len(self._data)

def __iter__(self) -> Iterable[Any]:
return iter(sorted(self._data))

def __contains__(self, item) -> bool:
return item in self._data

def __str__(self) -> str:
return str(sorted(self._data))


# todo what should the name of this class be?
class TimedRobotPy(IterativeRobotPy):
"""
TimedRobotPy implements the IterativeRobotBase robot program framework.
The TimedRobotPy class is intended to be subclassed by a user creating a
robot program.
Periodic() functions from the base class are called on an interval by a
Notifier instance.
"""

kDefaultPeriod: ClassVar[wpimath.units.seconds] = (
0.020 # todo this is a change to keep consistent units in the API
)

def __init__(self, period: wpimath.units.seconds = kDefaultPeriod) -> None:
"""
Constructor for TimedRobotPy.
:param period: period of the main robot periodic loop in seconds.
"""
super().__init__(period)

self._startTimeUs = _getFPGATime()
self._callbacks = _OrderedList()
self.loopStartTimeUs = 0
self.addPeriodic(self._loopFunc, period=self._periodS)

self._notifier, status = initializeNotifier()
if status != 0:
raise RuntimeError(
f"initializeNotifier() returned {self._notifier}, {status}"
)

status = setNotifierName(self._notifier, "TimedRobotPy")
if status != 0:
raise RuntimeError(f"setNotifierName() returned {status}")

report(_kResourceType_Framework, _kFramework_Timed)

def startCompetition(self) -> None:
"""
Provide an alternate "main loop" via startCompetition().
"""
self.robotInit()

if self.isSimulation():
self.simulationInit()

# Tell the DS that the robot is ready to be enabled
print("********** Robot program startup complete **********")
observeUserProgramStarting()

# Loop forever, calling the appropriate mode-dependent function
# (really not forever, there is a check for a break)
while True:
# We don't have to check there's an element in the queue first because
# there's always at least one (the constructor adds one). It's re-enqueued
# at the end of the loop.
callback = self._callbacks.pop()

status = updateNotifierAlarm(self._notifier, callback.expirationUs)
if status != 0:
raise RuntimeError(f"updateNotifierAlarm() returned {status}")

currentTimeUs, status = waitForNotifierAlarm(self._notifier)
if status != 0:
raise RuntimeError(
f"waitForNotifierAlarm() returned currentTimeUs={currentTimeUs} status={status}"
)

if currentTimeUs == 0:
# when HAL_StopNotifier(self.notifier) is called the above waitForNotifierAlarm
# will return a currentTimeUs==0 and the API requires robots to stop any loops.
# See the API for waitForNotifierAlarm
break

self.loopStartTimeUs = _getFPGATime()
self._runCallbackAndReschedule(callback, currentTimeUs)

# Process all other callbacks that are ready to run
while self._callbacks.peek().expirationUs <= currentTimeUs:
callback = self._callbacks.pop()
self._runCallbackAndReschedule(callback, currentTimeUs)

def _runCallbackAndReschedule(
self, callback: _Callback, currentTimeUs: microsecondsAsInt
) -> None:
callback.func()
callback.setNextStartTimeUs(currentTimeUs)
self._callbacks.add(callback)

def endCompetition(self) -> None:
"""
Ends the main loop in startCompetition().
"""
stopNotifier(self._notifier)

def getLoopStartTime(self) -> wpimath.units.seconds:
"""
todo was def getLoopStartTime(self) -> int: (Microseconds)
todo this show be wpimath.units.seconds
Return the system clock time in seconds (todo was microseconds)
for the start of the current
periodic loop. This is in the same time base as Timer.GetFPGATimestamp(),
but is stable through a loop. It is updated at the beginning of every
periodic callback (including the normal periodic loop).
:returns: Robot running time in seconds (todo was microseconds),
as of the start of the current
periodic function.
"""

return self.loopStartTimeUs / 1e6 # units are seconds
Copy link
Member

@virtuald virtuald Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should return seconds? The documentation says it's the same units as GetFPGATimestamp, so it should do so?


def addPeriodic(
self,
callback: Callable[[], None],
period: wpimath.units.seconds,
offset: wpimath.units.seconds = 0.0,
) -> None:
"""
Add a callback to run at a specific period with a starting time offset.
This is scheduled on TimedRobotPy's Notifier, so TimedRobotPy and the callback
run synchronously. Interactions between them are thread-safe.
:param callback: The callback to run.
:param period: The period at which to run the callback.
:param offset: The offset from the common starting time. This is useful
for scheduling a callback in a different timeslot relative
to TimedRobotPy.
"""

self._callbacks.add(
_Callback.makeCallBack(
callback, self._startTimeUs, int(period * 1e6), int(offset * 1e6)
)
)