diff --git a/app/core/recording/stream_manager.py b/app/core/recording/stream_manager.py index 6e1dd17..e6f4411 100644 --- a/app/core/recording/stream_manager.py +++ b/app/core/recording/stream_manager.py @@ -31,7 +31,6 @@ def __init__(self, app, recording, recording_info): self.recording = recording self.recording_info = recording_info self.subprocess_start_info = app.subprocess_start_up_info - self.should_stop = False # manually stopped self.user_config = self.settings.user_config self.account_config = self.settings.accounts_config @@ -47,8 +46,6 @@ def __init__(self, app, recording, recording_info): self.save_format = self._get_info("save_format", default=self.DEFAULT_SAVE_FORMAT).lower() self.proxy = self.is_use_proxy() self.direct_downloader = None - self.min_valid_recording_duration = 25 - self.recording_start_time = 0 os.makedirs(self.output_dir, exist_ok=True) self.app.language_manager.add_observer(self) self._ = {} @@ -229,21 +226,6 @@ async def start_recording(self, stream_info: StreamData): record_url = self._get_record_url(stream_info) self.set_preview_url(stream_info) - try: - if self.recording.rec_id in self.app.record_manager.active_recorders: - old_recorder = self.app.record_manager.active_recorders[self.recording.rec_id] - logger.warning( - f"Found existing recorder instance for {self.recording.rec_id}, id: {id(old_recorder)}, stopping it" - ) - old_recorder.request_stop() - - await asyncio.sleep(1) - - self.app.record_manager.active_recorders[self.recording.rec_id] = self - logger.info(f"Saved recorder instance for {self.recording.rec_id}, id: {id(self)}") - except Exception as e: - logger.error(f"Failed to save recorder instance: {e}") - if use_direct_download: logger.info(f"Use Direct Downloader to Download FLV Stream: {record_url}") headers = {} @@ -289,24 +271,6 @@ async def start_recording(self, stream_info: StreamData): self.user_config.get("custom_script_command") ) - async def remove_active_recorder(self): - try: - if self.recording.rec_id in self.app.record_manager.active_recorders: - del self.app.record_manager.active_recorders[self.recording.rec_id] - logger.info(f"Removed recorder from active_recorders: {self.recording.rec_id}") - except Exception as e: - logger.error(f"Failed to remove recorder instance: {e}") - - async def recheck_live_status(self): - if not self.should_stop: - # not manually stopped - recording_duration = time.time() - self.recording_start_time - if recording_duration > self.min_valid_recording_duration: - if self.app.recording_enabled and not self.is_flv_preferred_platform: - self.app.page.run_task(self.app.record_manager.check_if_live, self.recording) - else: - self.recording.status_info = RecordingStatus.RECORDING_ERROR - async def start_ffmpeg( self, record_name: str, @@ -320,9 +284,6 @@ async def start_ffmpeg( The child process executes ffmpeg for recording """ - logger.info(f"Starting ffmpeg recording - recorder id: {id(self)}, rec_id: {self.recording.rec_id}") - self.should_stop = False - try: save_file_path = ffmpeg_command[-1] @@ -339,41 +300,34 @@ async def start_ffmpeg( self.recording.record_url = record_url logger.info(f"Recording in Progress: {live_url}") logger.log("STREAM", f"Recording Stream URL: {record_url}") - self.recording_start_time = time.time() while True: - if self.should_stop or self.recording.force_stop or not self.app.recording_enabled: + if not self.recording.is_recording or not self.app.recording_enabled: logger.info(f"Preparing to End Recording: {live_url}") - await self.remove_active_recorder() - self.recording.is_recording = False - try: - if os.name == "nt": - if process.stdin: - process.stdin.write(b"q") - await process.stdin.drain() - await asyncio.sleep(5) - else: - import signal - process.send_signal(signal.SIGINT) - # process.terminate() - await asyncio.sleep(5) + if os.name == "nt": if process.stdin: - process.stdin.close() + process.stdin.write(b"q") + await process.stdin.drain() + await asyncio.sleep(5) + else: + import signal + process.send_signal(signal.SIGINT) + # process.terminate() + await asyncio.sleep(5) + + if process.stdin: + process.stdin.close() + try: await asyncio.wait_for(process.wait(), timeout=15.0) except asyncio.TimeoutError: logger.warning(f"FFmpeg process did not exit gracefully, forcing termination: {live_url}") process.kill() await process.wait() - self.recording.force_stop = False - break - if process.returncode is not None: logger.info(f"Exit loop recording (normal 0 | abnormal 1): code={process.returncode}, {live_url}") - await self.remove_active_recorder() - self.recording.is_recording = False break await asyncio.sleep(1) @@ -381,51 +335,46 @@ async def start_ffmpeg( return_code = process.returncode safe_return_code = [0, 255] stdout, stderr = await process.communicate() - if return_code not in safe_return_code and stderr: - if not self.recording.is_recording: - logger.error(f"FFmpeg Stderr Output: {str(stderr.decode()).splitlines()[0]}") - self.recording.status_info = RecordingStatus.RECORDING_ERROR + logger.error(f"FFmpeg Stderr Output: {str(stderr.decode()).splitlines()[0]}") + self.recording.status_info = RecordingStatus.RECORDING_ERROR - try: - self.app.record_manager.stop_recording(self.recording) - await self.app.record_card_manager.update_card(self.recording) - self.app.page.pubsub.send_others_on_topic("update", self.recording) - await self.app.snack_bar.show_snack_bar( - record_name + " " + self._["record_stream_error"], duration=2000 - ) - except Exception as e: - logger.debug(f"Failed to update UI: {e}") + try: + self.app.record_manager.stop_recording(self.recording) + await self.app.record_card_manager.update_card(self.recording) + self.app.page.pubsub.send_others_on_topic("update", self.recording) + await self.app.snack_bar.show_snack_bar( + record_name + " " + self._["record_stream_error"], duration=2000 + ) + except Exception as e: + logger.debug(f"Failed to update UI: {e}") if return_code in safe_return_code: - self.recording.is_live = False - if not self.recording.is_recording: - if self.recording.monitor_status: - self.recording.status_info = RecordingStatus.MONITORING - display_title = self.recording.title - else: - self.recording.status_info = RecordingStatus.STOPPED_MONITORING - display_title = self.recording.display_title + if self.recording.monitor_status: + self.recording.status_info = RecordingStatus.MONITORING + display_title = self.recording.title + else: + self.recording.status_info = RecordingStatus.STOPPED_MONITORING + display_title = self.recording.display_title - self.recording.live_title = None - if self.should_stop: - logger.success(f"Live recording has stopped: {record_name}") - else: - logger.success(f"Live recording completed: {record_name}") - self.app.page.run_task(self.end_message_push) - - try: - self.recording.update({"display_title": display_title}) - self.app.page.run_task(self.app.record_card_manager.update_card, self.recording) - self.app.page.pubsub.send_others_on_topic("update", self.recording) - except Exception as e: - logger.debug(f"Failed to update UI: {e}") + self.recording.live_title = None + if not self.recording.is_recording: + logger.success(f"Live recording has stopped: {record_name}") + else: - if not self.app.recording_enabled: - self.recording.status_info = RecordingStatus.NOT_RECORDING_SPACE - self.app.page.run_task(self.stop_recording_notify) + logger.success(f"Live recording completed: {record_name}") + self.app.page.run_task(self.end_message_push) + self.recording.is_recording = False + try: + self.recording.update({"display_title": display_title}) + self.app.page.run_task(self.app.record_card_manager.update_card, self.recording) + self.app.page.pubsub.send_others_on_topic("update", self.recording) + if not self.app.recording_enabled: + self.recording.status_info = RecordingStatus.NOT_RECORDING_SPACE + self.app.page.run_task(self.stop_recording_notify) - await self.recheck_live_status() + except Exception as e: + logger.debug(f"Failed to update UI: {e}") if self.user_config.get("convert_to_mp4") and self.save_format == "ts": if self.segment_record: @@ -473,6 +422,8 @@ async def start_ffmpeg( self.user_config.get("convert_to_mp4") ) + if self.app.recording_enabled and not self.is_flv_preferred_platform: + self.app.page.run_task(self.app.record_manager.check_if_live, self.recording) except Exception as e: logger.error(f"An error occurred during the subprocess execution: {e}") self.recording.status_info = RecordingStatus.RECORDING_ERROR @@ -583,9 +534,8 @@ async def custom_script_execute( params = [ f'--record_name "{record_name}"', f'--save_file_path "{save_file_path}"', - f'--save_type {save_type}', - f'--split_video_by_time {split_video_by_time}', - f'--converts_to_mp4 {converts_to_mp4}', + f"--save_type {save_type}--split_video_by_time {split_video_by_time}", + f"--converts_to_mp4 {converts_to_mp4}" ] else: params = [ @@ -672,10 +622,6 @@ async def start_direct_download( """ Use the direct downloader to download the live stream """ - - logger.info(f"Starting direct download - recorder id: {id(self)}, rec_id: {self.recording.rec_id}") - self.should_stop = False - try: await self.direct_downloader.start_download() @@ -683,15 +629,11 @@ async def start_direct_download( self.recording.record_url = record_url logger.info(f"Direct Downloading: {live_url}") logger.log("STREAM", f"Direct Download Stream URL: {record_url}") - self.recording_start_time = time.time() while True: - if self.should_stop or self.recording.force_stop or not self.app.recording_enabled: + if not self.recording.is_recording or not self.app.recording_enabled: logger.info(f"Prepare to end direct download: {live_url}") - await self.remove_active_recorder() - self.recording.is_recording = False await self.direct_downloader.stop_download() - self.recording.force_stop = False break await asyncio.sleep(1) @@ -699,37 +641,33 @@ async def start_direct_download( if self.direct_downloader.download_task and self.direct_downloader.download_task.done(): break - await self.remove_active_recorder() - self.recording.is_recording = False + if self.recording.monitor_status: + self.recording.status_info = RecordingStatus.MONITORING + display_title = self.recording.title + else: + self.recording.status_info = RecordingStatus.STOPPED_MONITORING + display_title = self.recording.display_title + self.recording.live_title = None if not self.recording.is_recording: - self.recording.is_live = False - if self.recording.monitor_status: - self.recording.status_info = RecordingStatus.MONITORING - display_title = self.recording.title - else: - self.recording.status_info = RecordingStatus.STOPPED_MONITORING - display_title = self.recording.display_title - - self.recording.live_title = None - if self.should_stop: - logger.success(f"Direct Downloading Stopped: {record_name}") - else: - logger.success(f"Direct Downloading Completed: {record_name}") - self.app.page.run_task(self.end_message_push) - - try: - self.recording.update({"display_title": display_title}) - await self.app.record_card_manager.update_card(self.recording) - self.app.page.pubsub.send_others_on_topic("update", self.recording) - except Exception as e: - logger.debug(f"Failed to update UI: {e}") + logger.success(f"Direct Downloading Stopped: {record_name}") + else: + logger.success(f"Direct Downloading Completed: {record_name}") + self.app.page.run_task(self.end_message_push) + self.recording.is_recording = False + if self.app.recording_enabled and not self.is_flv_preferred_platform: + self.app.page.run_task(self.app.record_manager.check_if_live, self.recording) - if not self.app.recording_enabled: - self.recording.status_info = RecordingStatus.NOT_RECORDING_SPACE - self.app.page.run_task(self.stop_recording_notify) + try: + self.recording.update({"display_title": display_title}) + await self.app.record_card_manager.update_card(self.recording) + self.app.page.pubsub.send_others_on_topic("update", self.recording) + if not self.app.recording_enabled: + self.recording.status_info = RecordingStatus.NOT_RECORDING_SPACE + self.app.page.run_task(self.stop_recording_notify) - await self.recheck_live_status() + except Exception as e: + logger.debug(f"Failed to update UI: {e}") if self.user_config.get("execute_custom_script") and script_command: logger.info("Prepare to execute custom script in the background") @@ -786,28 +724,55 @@ async def end_message_push(self): msg_manager = message_pusher.MessagePusher(self.settings) user_config = self.settings.user_config - if (self.app.recording_enabled and msg_manager.should_push_message( - self.settings, self.recording, check_manually_stopped=True, message_type='end') and - not self.recording.notified_live_end): - self.recording.notified_live_end = True - push_content = self._["push_content_end"] - end_push_message_text = user_config.get("custom_stream_end_content") - if end_push_message_text: - push_content = end_push_message_text - - push_at = datetime.today().strftime("%Y-%m-%d %H:%M:%S") - push_content = push_content.replace("[room_name]", self.recording.streamer_name).replace( - "[time]", push_at).replace("[title]", self.recording.live_title or "None") - msg_title = user_config.get("custom_notification_title").strip() - msg_title = msg_title or self._["status_notify"] - - self.app.page.run_task(msg_manager.push_messages, msg_title, push_content) - - def request_stop(self): - logger.info(f"Stop requested for recorder: {self.recording.url}, rec_id: {self.recording.rec_id}") - logger.info(f"Recorder instance details - id: {id(self)}, recording: {self.recording.title}") - - old_value = self.should_stop - self.should_stop = True - - logger.info(f"Set should_stop from {old_value} to {self.should_stop} for recorder: {self.recording.rec_id}") + if not self.app.recording_enabled: + return + + if self.recording.notified_live_end: + return + + try: + delay_seconds = int(user_config.get("stream_end_notification_delay_seconds") or 0) + except (TypeError, ValueError): + delay_seconds = 0 + delay_seconds = max(0, delay_seconds) + + if delay_seconds > 0: + if getattr(self.recording, "pending_live_end_notification", False): + return + self.recording.pending_live_end_notification = True + try: + await asyncio.sleep(delay_seconds) + finally: + self.recording.pending_live_end_notification = False + + if self.recording.is_live: + return + + if not msg_manager.should_push_message( + self.settings, self.recording, check_manually_stopped=True, message_type="end" + ): + return + + if self.recording.notified_live_end: + return + + push_content = self._["push_content_end"] + end_push_message_text = user_config.get("custom_stream_end_content") + if end_push_message_text: + push_content = end_push_message_text + + push_at = datetime.today().strftime("%Y-%m-%d %H:%M:%S") + push_content = push_content.replace("[room_name]", self.recording.streamer_name).replace( + "[time]", push_at + ) + msg_title = user_config.get("custom_notification_title").strip() + msg_title = msg_title or self._["status_notify"] + + self.recording.notified_live_end = True + self.app.page.run_task( + msg_manager.push_messages, + msg_title, + push_content, + "end", + {"room_name": self.recording.streamer_name, "url": self.recording.url}, + )