|
1 | | -__all__ = ("run", "quit", ) |
| 1 | +__all__ = ("run", "quit", "run_and_record", ) |
2 | 2 |
|
3 | 3 | import pygame |
4 | 4 | import asyncpygame as ap |
@@ -43,3 +43,81 @@ def run(main_func, *, fps=30, auto_quit=True): |
43 | 43 | raise ap.ExceptionGroup(group.message, unignorable_excs) |
44 | 44 | finally: |
45 | 45 | main_task.cancel() |
| 46 | + |
| 47 | + |
| 48 | +def run_and_record(main_func, *, fps=30, auto_quit=True, output_file="./output.mkv", overwrite=False, codec='libx265', |
| 49 | + quality=0): |
| 50 | + ''' |
| 51 | + Runs the program while recording the screen to a video file using ffmpeg. |
| 52 | + Requires numpy. |
| 53 | + ''' |
| 54 | + import subprocess |
| 55 | + from numpy import copyto as numpy_copyto |
| 56 | + from pygame.surfarray import pixels3d |
| 57 | + |
| 58 | + clock = ap.Clock() |
| 59 | + sdlevent = ap.SDLEvent() |
| 60 | + executor = ap.PriorityExecutor() |
| 61 | + main_task = ap.start(main_func(clock=clock, sdlevent=sdlevent, executor=executor)) |
| 62 | + screen = pygame.display.get_surface() |
| 63 | + |
| 64 | + if auto_quit: |
| 65 | + sdlevent.subscribe((pygame.QUIT, ), quit, priority=0) |
| 66 | + sdlevent.subscribe((pygame.KEYDOWN, ), lambda e, K=pygame.K_ESCAPE: e.key == K and quit(), priority=0) |
| 67 | + |
| 68 | + ffmpeg_cmd = ( |
| 69 | + 'ffmpeg', |
| 70 | + '-y' if overwrite else '-n', |
| 71 | + '-f', 'rawvideo', |
| 72 | + '-codec:v', 'rawvideo', |
| 73 | + '-pixel_format', 'rgb24', |
| 74 | + '-video_size', f'{screen.width}x{screen.height}', |
| 75 | + '-framerate', str(fps), |
| 76 | + '-i', '-', # stdin as the input source |
| 77 | + '-an', # no audio |
| 78 | + '-codec:v', codec, |
| 79 | + '-qscale:v', str(quality), |
| 80 | + output_file, |
| 81 | + ) |
| 82 | + process = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE, bufsize=0) |
| 83 | + output_buffer = _create_output_buffer_for_surface(screen) |
| 84 | + |
| 85 | + # LOAD_FAST |
| 86 | + pygame_event_get = pygame.event.get |
| 87 | + clock_tick = clock.tick |
| 88 | + sdlevent_dispatch = sdlevent.dispatch |
| 89 | + process_stdin_write = process.stdin.write |
| 90 | + screen_lock = screen.lock |
| 91 | + screen_unlock = screen.unlock |
| 92 | + |
| 93 | + try: |
| 94 | + dt = 1000.0 / fps |
| 95 | + while True: |
| 96 | + for event in pygame_event_get(): |
| 97 | + sdlevent_dispatch(event) |
| 98 | + clock_tick(dt) |
| 99 | + executor() |
| 100 | + |
| 101 | + screen_lock() |
| 102 | + frame = pixels3d(screen).transpose((1, 0, 2)) # 高さ 幅 画素 の順にする |
| 103 | + numpy_copyto(output_buffer, frame) |
| 104 | + process_stdin_write(output_buffer) |
| 105 | + del frame |
| 106 | + screen_unlock() |
| 107 | + except AppQuit: |
| 108 | + pass |
| 109 | + except ap.ExceptionGroup as group: |
| 110 | + unignorable_excs = tuple(e for e in group.exceptions if not isinstance(e, AppQuit)) |
| 111 | + if unignorable_excs: |
| 112 | + raise ap.ExceptionGroup(group.message, unignorable_excs) |
| 113 | + finally: |
| 114 | + main_task.cancel() |
| 115 | + process.stdin.close() |
| 116 | + process.wait() |
| 117 | + |
| 118 | + |
| 119 | +def _create_output_buffer_for_surface(surface: pygame.Surface): |
| 120 | + from pygame.surfarray import pixels3d |
| 121 | + import numpy |
| 122 | + s = pixels3d(surface).shape |
| 123 | + return numpy.empty((s[1], s[0], s[2], ), dtype='uint8') # 高さ 幅 画素 の順にする |
0 commit comments