|
| 1 | +import asyncio |
| 2 | +import os |
| 3 | +from queue import Queue |
| 4 | + |
| 5 | +import cv2 |
| 6 | +import mediapipe as mp |
| 7 | +import numpy as np |
| 8 | +from mediapipe import solutions |
| 9 | +from mediapipe.framework.formats import landmark_pb2 |
| 10 | + |
| 11 | +import livekit |
| 12 | + |
| 13 | +URL = 'ws://localhost:7880' |
| 14 | +TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE5MDY2MTMyODgsImlzcyI6IkFQSVRzRWZpZFpqclFvWSIsIm5hbWUiOiJuYXRpdmUiLCJuYmYiOjE2NzI2MTMyODgsInN1YiI6Im5hdGl2ZSIsInZpZGVvIjp7InJvb20iOiJ0ZXN0Iiwicm9vbUFkbWluIjp0cnVlLCJyb29tQ3JlYXRlIjp0cnVlLCJyb29tSm9pbiI6dHJ1ZSwicm9vbUxpc3QiOnRydWV9fQ.uSNIangMRu8jZD5mnRYoCHjcsQWCrJXgHCs0aNIgBFY' |
| 15 | + |
| 16 | +frame_queue = Queue() |
| 17 | +argb_frame = None |
| 18 | + |
| 19 | +# You can download a face landmark model file from https://developers.google.com/mediapipe/solutions/vision/face_landmarker#models |
| 20 | +model_file = 'face_landmarker.task' |
| 21 | +model_path = os.path.dirname(os.path.realpath(__file__)) + '/' + model_file |
| 22 | + |
| 23 | +BaseOptions = mp.tasks.BaseOptions |
| 24 | +FaceLandmarker = mp.tasks.vision.FaceLandmarker |
| 25 | +FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions |
| 26 | +VisionRunningMode = mp.tasks.vision.RunningMode |
| 27 | + |
| 28 | +options = FaceLandmarkerOptions( |
| 29 | + base_options=BaseOptions(model_asset_path=model_path), |
| 30 | + running_mode=VisionRunningMode.VIDEO) |
| 31 | + |
| 32 | +# from https://github.com/googlesamples/mediapipe/blob/main/examples/face_landmarker/python/%5BMediaPipe_Python_Tasks%5D_Face_Landmarker.ipynb |
| 33 | + |
| 34 | + |
| 35 | +def draw_landmarks_on_image(rgb_image, detection_result): |
| 36 | + face_landmarks_list = detection_result.face_landmarks |
| 37 | + |
| 38 | + # Loop through the detected faces to visualize. |
| 39 | + for idx in range(len(face_landmarks_list)): |
| 40 | + face_landmarks = face_landmarks_list[idx] |
| 41 | + |
| 42 | + # Draw the face landmarks. |
| 43 | + face_landmarks_proto = landmark_pb2.NormalizedLandmarkList() |
| 44 | + face_landmarks_proto.landmark.extend([ |
| 45 | + landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in face_landmarks |
| 46 | + ]) |
| 47 | + |
| 48 | + solutions.drawing_utils.draw_landmarks( |
| 49 | + image=rgb_image, |
| 50 | + landmark_list=face_landmarks_proto, |
| 51 | + connections=mp.solutions.face_mesh.FACEMESH_TESSELATION, |
| 52 | + landmark_drawing_spec=None, |
| 53 | + connection_drawing_spec=mp.solutions.drawing_styles |
| 54 | + .get_default_face_mesh_tesselation_style()) |
| 55 | + solutions.drawing_utils.draw_landmarks( |
| 56 | + image=rgb_image, |
| 57 | + landmark_list=face_landmarks_proto, |
| 58 | + connections=mp.solutions.face_mesh.FACEMESH_CONTOURS, |
| 59 | + landmark_drawing_spec=None, |
| 60 | + connection_drawing_spec=mp.solutions.drawing_styles |
| 61 | + .get_default_face_mesh_contours_style()) |
| 62 | + solutions.drawing_utils.draw_landmarks( |
| 63 | + image=rgb_image, |
| 64 | + landmark_list=face_landmarks_proto, |
| 65 | + connections=mp.solutions.face_mesh.FACEMESH_IRISES, |
| 66 | + landmark_drawing_spec=None, |
| 67 | + connection_drawing_spec=mp.solutions.drawing_styles |
| 68 | + .get_default_face_mesh_iris_connections_style()) |
| 69 | + |
| 70 | + |
| 71 | +async def room() -> None: |
| 72 | + room = livekit.Room() |
| 73 | + await room.connect(URL, TOKEN) |
| 74 | + print("connected to room: " + room.name) |
| 75 | + |
| 76 | + video_stream = None |
| 77 | + |
| 78 | + @room.on("track_subscribed") |
| 79 | + def on_track_subscribed(track: livekit.Track, |
| 80 | + publication: livekit.RemoteTrackPublication, |
| 81 | + participant: livekit.RemoteParticipant): |
| 82 | + if track.kind == livekit.TrackKind.KIND_VIDEO: |
| 83 | + nonlocal video_stream |
| 84 | + video_stream = livekit.VideoStream(track) |
| 85 | + |
| 86 | + @video_stream.on("frame_received") |
| 87 | + def on_video_frame(frame: livekit.VideoFrame): |
| 88 | + frame_queue.put(frame) |
| 89 | + |
| 90 | + await room.run() |
| 91 | + |
| 92 | + |
| 93 | +def display_frames() -> None: |
| 94 | + cv2.namedWindow('livekit_video', cv2.WINDOW_AUTOSIZE) |
| 95 | + cv2.startWindowThread() |
| 96 | + |
| 97 | + global argb_frame |
| 98 | + |
| 99 | + with FaceLandmarker.create_from_options(options) as landmarker: |
| 100 | + while True: |
| 101 | + frame = frame_queue.get() |
| 102 | + buffer = frame.buffer |
| 103 | + |
| 104 | + if argb_frame is None or argb_frame.width != buffer.width or argb_frame.height != buffer.height: |
| 105 | + argb_frame = livekit.ArgbFrame( |
| 106 | + livekit.VideoFormatType.FORMAT_ABGR, buffer.width, buffer.height) |
| 107 | + |
| 108 | + buffer.to_argb(argb_frame) |
| 109 | + |
| 110 | + arr = np.ctypeslib.as_array(argb_frame.data) |
| 111 | + arr = arr.reshape((argb_frame.height, argb_frame.width, 4)) |
| 112 | + arr = cv2.cvtColor(arr, cv2.COLOR_RGBA2RGB) |
| 113 | + |
| 114 | + mp_image = mp.Image( |
| 115 | + image_format=mp.ImageFormat.SRGB, data=arr) |
| 116 | + |
| 117 | + detection_result = landmarker.detect_for_video( |
| 118 | + mp_image, frame.timestamp) |
| 119 | + |
| 120 | + draw_landmarks_on_image(arr, detection_result) |
| 121 | + |
| 122 | + arr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) |
| 123 | + |
| 124 | + cv2.imshow('livekit_video', arr) |
| 125 | + if cv2.waitKey(1) & 0xFF == ord('q'): |
| 126 | + break |
| 127 | + |
| 128 | + cv2.destroyAllWindows() |
| 129 | + |
| 130 | + |
| 131 | +async def main() -> None: |
| 132 | + loop = asyncio.get_event_loop() |
| 133 | + future = loop.run_in_executor(None, asyncio.run, room()) |
| 134 | + |
| 135 | + display_frames() |
| 136 | + await future |
| 137 | + |
| 138 | +if __name__ == "__main__": |
| 139 | + asyncio.run(main()) |
0 commit comments