Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ __pycache__/

# YOLO / ML
*.pt
!autonomous_parking/models/
!autonomous_parking/models/best.pt

*.onnx
runs/
runs
datasets/
*.cache

Expand Down
233 changes: 136 additions & 97 deletions autonomous_parking/autonomous_parking/vision_node.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,9 @@
#!/usr/bin/env python3
"""Vision node: slot 유형 및 점유 상태 감지.

파이프라인:
1. 전방/좌측/우측 camera image 구독
2. 관측 trigger(/parking/obs_trigger, Bool=True) 수신 시:
- YOLO model이 있으면 ISA mark(장애인 마크) 감지로 slot 유형 보정
- model 없으면 YAML 메타데이터의 slot 유형과 점유 상태를 그대로 사용
3. /parking/slot_states (std_msgs/String, JSON) 발행

YOLO class:
0 = 'isa_mark' - 장애인 전용 마크 → slot 유형을 handicapped로 변경

LiDAR 점유 판단:
관측 포인트에서 각 slot 방향의 LiDAR 거리를 측정한다.
LiDAR 거리 < slot center까지 거리 - OCCUPANCY_MARGIN이면 차량 있음으로 판단.
"""

import json
import math
import os

import numpy as np
import yaml
import rclpy
from rclpy.node import Node
Expand All @@ -28,19 +12,14 @@
from nav_msgs.msg import Odometry
from ament_index_python.packages import get_package_share_directory

try:
from cv_bridge import CvBridge
CV_OK = True
except ImportError:
CV_OK = False

try:
from ultralytics import YOLO
YOLO_OK = True
except ImportError:
YOLO_IMPORT_ERROR = ''
except ImportError as e:
YOLO_OK = False
YOLO_IMPORT_ERROR = str(e)

# LiDAR 거리가 slot center까지 거리보다 이 값 이상 짧으면 점유로 판단
OCCUPANCY_MARGIN = 0.8 # m


Expand All @@ -49,57 +28,85 @@ def __init__(self):
super().__init__('vision_node')

self.declare_parameter('yolo_model', '')
self.declare_parameter('use_lidar_occupancy', False)

model_path = self.get_parameter('yolo_model').value
self.use_lidar_occupancy = bool(self.get_parameter('use_lidar_occupancy').value)

pkg = get_package_share_directory('autonomous_parking')
with open(os.path.join(pkg, 'config', 'slot_metadata.yaml')) as f:
data = yaml.safe_load(f)
self.slots = {s['id']: dict(s) for s in data['slots']}

self.base_slots = {s['id']: dict(s) for s in data['slots']}
self.slots = {sid: dict(s) for sid, s in self.base_slots.items()}

self.slot_rois = {
'front': {
'A1': (60, 60, 240, 210),
'A2': (240, 60, 470, 210),
'B1': (0, 220, 220, 360),
'B2': (220, 220, 470, 360),
},
'left': {
'A1': (0, 60, 180, 220),
'A2': (180, 60, 420, 220),
'B1': (0, 220, 150, 360),
'B2': (150, 220, 320, 360),
},
'right': {
'A1': (320, 120, 470, 250),
'A2': (470, 120, 640, 250),
'B1': (340, 220, 510, 340),
'B2': (510, 220, 640, 340),
},
}

self.model = None
if YOLO_OK and model_path and os.path.isfile(model_path):
self.model = YOLO(model_path)
self.get_logger().info(f'YOLOv8 model 로드 완료: {model_path}')
self.get_logger().info(f'yolo_model parameter: "{model_path}"')
self.get_logger().info(f'YOLO_OK: {YOLO_OK}')
self.get_logger().info(f'use_lidar_occupancy: {self.use_lidar_occupancy}')

if not YOLO_OK:
self.get_logger().warn(f'ultralytics import 실패: {YOLO_IMPORT_ERROR}')
elif not model_path:
self.get_logger().warn('yolo_model 파라미터가 비어 있음')
elif not os.path.isfile(model_path):
self.get_logger().warn(f'모델 파일이 존재하지 않음: {model_path}')
else:
self.get_logger().warn(
'YOLOv8 model 없음 — slot 유형은 YAML 메타데이터 사용. '
'점유 여부도 YAML 메타데이터 사용.'
)

self.bridge = CvBridge() if CV_OK else None
try:
self.model = YOLO(model_path)
self.get_logger().info(f'YOLOv8 model 로드 완료: {model_path}')
except Exception as e:
self.get_logger().error(f'YOLO model 로드 실패: {e}')
self.model = None

# 각 camera의 최신 image (첫 frame 수신 전 None)
self.images = {
'front': None,
'left': None,
'left': None,
'right': None,
}
self.triggered = False

# LiDAR / odometry 상태
self.ranges = None
self.angle_min = -math.pi
self.angle_inc = 2 * math.pi / 360
self.angle_inc = 2 * math.pi / 360.0
self.robot_x = 0.0
self.robot_y = 0.0
self.robot_yaw = 0.0
self.odom_ok = False

# Subscriber
self.create_subscription(Image, '/ego/camera/image_raw', self._mk_image_cb('front'), 10)
self.create_subscription(Image, '/ego/camera_left/image_raw', self._mk_image_cb('left'), 10)
self.create_subscription(Image, '/ego/camera/image_raw', self._mk_image_cb('front'), 10)
self.create_subscription(Image, '/ego/camera_left/image_raw', self._mk_image_cb('left'), 10)
self.create_subscription(Image, '/ego/camera_right/image_raw', self._mk_image_cb('right'), 10)
self.create_subscription(LaserScan, '/ego/scan', self._scan_cb, 10)
self.create_subscription(Odometry, '/ego/odom', self._odom_cb, 10)
self.create_subscription(LaserScan, '/ego/scan', self._scan_cb, 10)
self.create_subscription(Odometry, '/ego/odom', self._odom_cb, 10)
self.create_subscription(Bool, '/parking/obs_trigger', self._trigger_cb, 10)

self.pub = self.create_publisher(String, '/parking/slot_states', 10)
self.create_timer(0.1, self._tick)

self.get_logger().info('Vision node 시작')

# ── Callback ──────────────────────────────────────────────────────────

def _mk_image_cb(self, key):
def _cb(msg):
self.images[key] = msg
Expand All @@ -123,33 +130,70 @@ def _trigger_cb(self, msg):
if msg.data:
self.triggered = True

# ── 메인 tick ──────────────────────────────────────────────────────────

def _tick(self):
if not self.triggered:
return
self.triggered = False

# 매 관측마다 YAML 기본 상태로 초기화
self.slots = {sid: dict(s) for sid, s in self.base_slots.items()}

available = [k for k, v in self.images.items() if v is not None]
self.get_logger().info(f'Vision trigger — 사용 가능 camera: {available}')

if self.model is None:
# occupancy는 기본적으로 YAML 사용
if self.use_lidar_occupancy:
self._check_occupancy_lidar()
else:
self.get_logger().debug('LiDAR occupancy 비활성화 — YAML occupied 사용')

if self.model is None or not available:
self._publish(self.slots)
return

# 1단계: LiDAR로 점유 여부 갱신
self._check_occupancy_lidar()
self._run_yolo(available)

def _image_msg_to_bgr(self, msg: Image):
enc = msg.encoding.lower()

# 2단계: YOLO로 ISA mark 감지 → slot 유형 보정
if CV_OK and available:
self._run_yolo(available)
if enc in ('bgr8', 'rgb8'):
channels = 3
elif enc in ('bgra8', 'rgba8'):
channels = 4
elif enc in ('mono8', '8uc1'):
channels = 1
else:
self._publish(self.slots)
raise ValueError(f'지원하지 않는 image encoding: {msg.encoding}')

raw = np.frombuffer(msg.data, dtype=np.uint8)

row_stride = msg.step
expected = msg.height * row_stride
if raw.size < expected:
raise ValueError(f'이미지 버퍼 크기 부족: got={raw.size}, expected>={expected}')

raw = raw[:expected].reshape((msg.height, row_stride))
pixel_bytes = msg.width * channels
raw = raw[:, :pixel_bytes]

if channels == 1:
img = raw.reshape((msg.height, msg.width))
return np.stack([img, img, img], axis=-1)

# ── LiDAR 점유 감지 ────────────────────────────────────────────────────
img = raw.reshape((msg.height, msg.width, channels))

if enc == 'bgr8':
return img.copy()
if enc == 'rgb8':
return img[:, :, ::-1].copy()
if enc == 'bgra8':
return img[:, :, :3].copy()
if enc == 'rgba8':
return img[:, :, [2, 1, 0]].copy()

raise ValueError(f'처리하지 못한 encoding: {msg.encoding}')

def _lidar_range(self, robot_angle):
"""로봇 frame 기준 각도의 LiDAR 거리값 반환. 유효하지 않으면 inf."""
if not self.ranges:
return float('inf')
idx = round((robot_angle - self.angle_min) / self.angle_inc)
Expand All @@ -158,85 +202,80 @@ def _lidar_range(self, robot_angle):
return float('inf') if (math.isnan(r) or math.isinf(r)) else r

def _check_occupancy_lidar(self):
"""현재 LiDAR scan과 robot pose로 각 slot의 점유 여부를 판단.

slot center 방향의 LiDAR 거리가
center까지 거리 - OCCUPANCY_MARGIN보다 짧으면 차량 있음으로 판단.
"""
if not self.odom_ok or self.ranges is None:
self.get_logger().warn('LiDAR 점유 확인 불가 — odometry 또는 scan 미수신')
return

for sid, slot in self.slots.items():
dx = slot['center_x'] - self.robot_x
dy = slot['center_y'] - self.robot_y
d_to_center = math.hypot(dx, dy)

# slot center 방향각을 로봇 frame으로 변환
angle_world = math.atan2(dy, dx)
angle_robot = angle_world - self.robot_yaw

lidar_d = self._lidar_range(angle_robot)

# center까지 거리보다 OCCUPANCY_MARGIN 이상 짧으면 차량 있음
occupied = lidar_d < (d_to_center - OCCUPANCY_MARGIN)
self.slots[sid]['occupied'] = occupied

self.get_logger().debug(
f"LiDAR 점유: slot {sid} d_center={d_to_center:.2f}m "
f"lidar={lidar_d:.2f}m → {'occupied' if occupied else 'empty'}"
)

# ── YOLO 추론 (ISA mark 전용) ──────────────────────────────────────────

def _run_yolo(self, camera_keys):
"""사용 가능한 모든 camera image에 YOLO 추론을 실행해 ISA mark를 감지.

cls 0 (isa_mark)만 처리한다. 점유 여부는 LiDAR에서 이미 결정됨.
"""
for key in camera_keys:
msg = self.images[key]
if msg is None:
continue

try:
cv_img = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
bgr = self._image_msg_to_bgr(msg)
results = self.model.predict(source=bgr, conf=0.4, verbose=False)
except Exception as e:
self.get_logger().error(f'cv_bridge 오류 [{key}]: {e}')
self.get_logger().error(f'YOLO 처리 실패 [{key}]: {e}')
continue

results = self.model.predict(cv_img, conf=0.4, verbose=False)
for r in results:
for box in r.boxes:
cls = int(box.cls[0])
if cls != 0: # isa_mark(cls 0) 이외 무시
if cls != 0:
continue

cx = float((box.xyxy[0][0] + box.xyxy[0][2]) / 2)
cy = float((box.xyxy[0][1] + box.xyxy[0][3]) / 2)
cx = float((box.xyxy[0][0] + box.xyxy[0][2]) / 2.0)
cy = float((box.xyxy[0][1] + box.xyxy[0][3]) / 2.0)
conf = float(box.conf[0])

best_id = self._match_isa_to_slot(cx, cy)
if best_id:
self.slots[best_id]['type'] = 'handicapped'
slot_id = self._match_isa_to_slot(key, cx, cy)
if slot_id is None:
self.get_logger().info(
f"YOLO ISA mark 감지: slot {best_id} → handicapped "
f"(conf={conf:.2f}, camera={key})"
f'ISA mark 미매칭: camera={key}, conf={conf:.2f}, cx={cx:.1f}, cy={cy:.1f}'
)
continue

self.slots[slot_id]['type'] = 'handicapped'
self.get_logger().info(
f'YOLO ISA mark 감지: slot {slot_id} → handicapped '
f'(camera={key}, conf={conf:.2f}, cx={cx:.1f}, cy={cy:.1f})'
)

self._publish(self.slots)

def _match_isa_to_slot(self, cx, cy):
"""ISA mark bounding box 중심에 가장 가까운 slot ID 반환.
def _match_isa_to_slot(self, camera_key, cx, cy):
candidates = self.slot_rois.get(camera_key, {})
matched = []

완전한 구현은 camera intrinsic matrix K와 robot pose를 이용해
slot 월드 좌표를 image 좌표로 투영해야 한다.
현재는 robot pose 통합이 없으므로 None 반환 → YAML 유형 유지.
"""
return None
for slot_id, (x1, y1, x2, y2) in candidates.items():
if x1 <= cx <= x2 and y1 <= cy <= y2:
roi_cx = (x1 + x2) / 2.0
roi_cy = (y1 + y2) / 2.0
dist = math.hypot(cx - roi_cx, cy - roi_cy)
matched.append((dist, slot_id))

# ── Publish ────────────────────────────────────────────────────────────
if not matched:
return None

matched.sort(key=lambda x: x[0])
return matched[0][1]

def _publish(self, slots):
occupied_ids = [sid for sid, s in slots.items() if s['occupied']]
self.get_logger().info(f'occupied slots: {occupied_ids}')

payload = [
{
'id': sid,
Expand Down Expand Up @@ -270,4 +309,4 @@ def main(args=None):


if __name__ == '__main__':
main()
main()
Binary file added autonomous_parking/models/best.pt
Binary file not shown.
Loading