diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py
index 2fbf6791ed4c..f3fb91c57395 100644
--- a/awx/main/tasks/receptor.py
+++ b/awx/main/tasks/receptor.py
@@ -228,22 +228,24 @@ class RemoteJobError(RuntimeError):
     pass
 
 
-def run_until_complete(node, timing_data=None, **kwargs):
+def run_until_complete(node, timing_data=None, worktype='ansible-runner', ttl='20s', **kwargs):
     """
     Runs an ansible-runner work_type on remote node, waits until it completes, then returns stdout.
     """
+
     config_data = read_receptor_config()
     receptor_ctl = get_receptor_ctl(config_data)
 
     use_stream_tls = getattr(get_conn_type(node, receptor_ctl), 'name', None) == "STREAMTLS"
     kwargs.setdefault('tlsclient', get_tls_client(config_data, use_stream_tls))
-    kwargs.setdefault('ttl', '20s')
+    if ttl is not None:
+        kwargs['ttl'] = ttl
     kwargs.setdefault('payload', '')
     if work_signing_enabled(config_data):
         kwargs['signwork'] = True
 
     transmit_start = time.time()
-    result = receptor_ctl.submit_work(worktype='ansible-runner', node=node, **kwargs)
+    result = receptor_ctl.submit_work(worktype=worktype, node=node, **kwargs)
 
     unit_id = result['unitid']
     run_start = time.time()
@@ -371,7 +373,7 @@ def _convert_args_to_cli(vargs):
     return args
 
 
-def worker_cleanup(node_name, vargs, timeout=300.0):
+def worker_cleanup(node_name, vargs):
     args = _convert_args_to_cli(vargs)
 
     remote_command = ' '.join(args)
diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py
index 6d161d2ef8b5..e48fa58cd1b8 100644
--- a/awx/main/tasks/system.py
+++ b/awx/main/tasks/system.py
@@ -25,6 +25,7 @@
 from django.utils.translation import gettext_noop
 from django.core.cache import cache
 from django.core.exceptions import ObjectDoesNotExist
+from django.db.models.query import QuerySet
 
 # Django-CRUM
 from crum import impersonate
@@ -379,7 +380,21 @@ def purge_old_stdout_files():
             logger.debug("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT, f)))
 
 
-def _cleanup_images_and_files(**kwargs):
+class CleanupImagesAndFilesHelper:
+    @classmethod
+    def get_first_control_instance(cls) -> Instance | None:
+        return (
+            Instance.objects.filter(node_type__in=['hybrid', 'control'], node_state=Instance.States.READY, enabled=True, capacity__gt=0)
+            .order_by('-hostname')
+            .first()
+        )
+
+    @classmethod
+    def get_execution_instances(cls) -> QuerySet[Instance]:
+        return Instance.objects.filter(node_type='execution', node_state=Instance.States.READY, enabled=True, capacity__gt=0)
+
+
+def _cleanup_images_and_files(worktype='ansible-runner', ttl='20s', **kwargs):
     if settings.IS_K8S:
         return
     this_inst = Instance.objects.me()
@@ -394,13 +409,10 @@ def _cleanup_images_and_files(**kwargs):
             logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}')
 
     # if we are the first instance alphabetically, then run cleanup on execution nodes
-    checker_instance = (
-        Instance.objects.filter(node_type__in=['hybrid', 'control'], node_state=Instance.States.READY, enabled=True, capacity__gt=0)
-        .order_by('-hostname')
-        .first()
-    )
+    checker_instance = CleanupImagesAndFilesHelper.get_first_control_instance()
+
     if checker_instance and this_inst.hostname == checker_instance.hostname:
-        for inst in Instance.objects.filter(node_type='execution', node_state=Instance.States.READY, enabled=True, capacity__gt=0):
+        for inst in CleanupImagesAndFilesHelper.get_execution_instances():
             runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs)
             if not runner_cleanup_kwargs:
                 continue
diff --git a/awx/main/tests/live/tests/conftest.py b/awx/main/tests/live/tests/conftest.py
index 7531e3ceae11..7e9b0f635af4 100644
--- a/awx/main/tests/live/tests/conftest.py
+++ b/awx/main/tests/live/tests/conftest.py
@@ -1,7 +1,10 @@
+import subprocess
 import time
 
 import pytest
 
+from unittest import mock
+
 # These tests are invoked from the awx/main/tests/live/ subfolder
 # so any fixtures from higher-up conftest files must be explicitly included
 from awx.main.tests.functional.conftest import *  # noqa
@@ -59,3 +62,20 @@ def default_org():
 def demo_inv(default_org):
     inventory, _ = Inventory.objects.get_or_create(name='Demo Inventory', defaults={'organization': default_org})
     return inventory
+
+
+@pytest.fixture
+def podman_image_generator():
+    """
+    Generate a tagless podman image from awx base EE
+    """
+
+    def fn():
+        dockerfile = """
+        FROM quay.io/ansible/awx-ee:latest
+        RUN echo "Hello, Podman!" > /tmp/hello.txt
+        """
+        cmd = ['podman', 'build', '-f', '-']  # Create an image without a tag
+        subprocess.run(cmd, capture_output=True, input=dockerfile, text=True, check=True)
+
+    return fn
diff --git a/awx/main/tests/live/tests/test_cleanup_task.py b/awx/main/tests/live/tests/test_cleanup_task.py
index 137032b48cb1..1a98eef13fed 100644
--- a/awx/main/tests/live/tests/test_cleanup_task.py
+++ b/awx/main/tests/live/tests/test_cleanup_task.py
@@ -1,9 +1,19 @@
+from contextlib import nullcontext
 import os
+import json
+import pytest
 import tempfile
 import subprocess
+from unittest import mock
 
-from awx.main.tasks.receptor import _convert_args_to_cli
+from awx.main.tasks.receptor import _convert_args_to_cli, run_until_complete
 from awx.main.models import Instance, JobTemplate
+from awx.main.tasks.system import _cleanup_images_and_files
+
+
+def get_podman_images():
+    cmd = ['podman', 'images', '--format', 'json']
+    return json.loads((subprocess.run(cmd, capture_output=True, text=True, check=True)).stdout)
 
 
 def test_folder_cleanup_multiple_running_jobs_execution_node(request):
@@ -37,3 +47,34 @@ def delete_jobs():
     print('ansible-runner worker ' + remote_command)
 
     assert [os.path.exists(job_dir) for job_dir in job_dirs] == [True for i in range(3)]
+
+
+@pytest.mark.parametrize(
+    'worktype',
+    ('remote', 'local'),
+)
+def test_tagless_image(podman_image_generator, worktype):
+    """
+    Ensure podman images on Control and Hybrid nodes are deleted during cleanup.
+    """
+    podman_image_generator()
+
+    dangling_image = next((image for image in get_podman_images() if image.get('Dangling', False)), None)
+    assert dangling_image
+
+    with (
+        (
+            mock.patch('awx.main.tasks.receptor.run_until_complete', lambda *args, **kwargs: run_until_complete(*args, worktype='local', ttl=None, **kwargs))
+            if worktype == 'local'
+            else nullcontext()
+        ),
+        (
+            mock.patch('awx.main.tasks.system.CleanupImagesAndFilesHelper.get_execution_instances', lambda: [Instance.objects.me()])
+            if worktype == 'local'
+            else nullcontext()
+        ),
+    ):
+        _cleanup_images_and_files(image_prune=True)
+
+    for image in get_podman_images():
+        assert image['Id'] != dangling_image['Id']