-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Labels
src/maps_backendHas to do w/ the `maps_backend` packageHas to do w/ the `maps_backend` packagetestsstuff related to testing our work!stuff related to testing our work!
Description
When making changes to our code, we want to have tests to ensure nothing breaks. However, the maps_backend package doesn't have any tests!
Goals
- Create a test for the
maps_backendpackage - Ensure the frontend receives messages from this backend
- Frontend uses WebSockets, so we'll create a "fake" one in the test to pretend we're the website
- The backend is a ROS 2 node, so we can launch it in the test with some fake settings
Checklist
If you complete all of these subtasks, we can close this issue! :D
- Create a new Colcon test
-
Step 1: Create a new folder at
src/maps_backend/calledtest -
Step 2: Add a
test_maps_backend_with_fake_gps.pyPython file -
Step 3: Add some Python code that looks something like so:
import asyncio import time import unittest import launch_testing.markers import pytest import rclpy import websockets from launch import LaunchDescription from launch_ros.actions.node import Node as LaunchRosNode from launch_testing import ProcInfoHandler from rclpy.node import Node from rclpy.publisher import Publisher from rclpy.qos import QoSPresetProfiles, QoSProfile from sensor_msgs.msg import NavSatFix from std_msgs.msg import Header PUBLISHER_QOS_PROFILE: QoSProfile = QoSPresetProfiles.SENSOR_DATA.value MAPS_BACKEND_NODE: LaunchRosNode = LaunchRosNode( executable="maps_backend", package="maps_backend", name="maps_backend_instance_in_test", ) @pytest.mark.launch_test @launch_testing.markers.keep_alive def generate_test_description() -> LaunchDescription: """ Creates the test launch description. Ran before the test classes below it. """ return LaunchDescription([MAPS_BACKEND_NODE]) class TestMapsBackend(unittest.TestCase): """Tests that the `maps_backend` node works properly.""" def test_node_running(self, proc_info: ProcInfoHandler): """Ensures the node starts on time.""" proc_info.assertWaitForStartup(process=MAPS_BACKEND_NODE, timeout=2.0) # pyright: ignore[reportAttributeAccessIssue] def test_node_resends_gps_msgs_over_websockets(self): """Ensures the node properly sends our GPS messages to the frontend.""" rclpy.init(args=None) node: Node = rclpy.create_node("fake_gps_publisher_node") gps_pub: Publisher = node.create_publisher( NavSatFix, "/sensors/gps", PUBLISHER_QOS_PROFILE ) # define a fake coord (lat, lon) = (1.0, 2.0) # create a fake navsatfix msg msg: NavSatFix = NavSatFix() msg.header = Header() # required to create the message header msg.header.frame_id = "gps_link" # search in`sensors_node` for why this is msg.latitude = lat msg.longitude = lon msg.altitude = 0.0 # create an async func to properly await the websocket recv async def _run(): async with websockets.connect("ws://192.168.1.68:9001") as ws: # we'll continue sending messages for 500 ms end_time: float = time.time() + 0.5 # send many messages to ensure the `maps_backend` sees one while time.time() < end_time: msg.header.stamp = node.get_clock().now().to_msg() gps_pub.publish(msg) # tell ros 2 to send off our message rclpy.spin_once(node, timeout_sec=0.01) time.sleep(0.01) got: str = await asyncio.wait_for(ws.recv(), timeout=3.0) self.assertEqual(got, f'{{"lat": {lat}, "lon": {lon}}}') # run the rest of the test (as defined above) try: asyncio.get_event_loop().run_until_complete(_run()) finally: node.destroy_node() rclpy.shutdown()
-
- Ensure the test runs successfully by typing
just testinto a terminal
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
src/maps_backendHas to do w/ the `maps_backend` packageHas to do w/ the `maps_backend` packagetestsstuff related to testing our work!stuff related to testing our work!
Type
Projects
Status
Todo