diff --git a/app/core/recording/record_manager.py b/app/core/recording/record_manager.py index 5cebbc8..4f8cda7 100644 --- a/app/core/recording/record_manager.py +++ b/app/core/recording/record_manager.py @@ -9,7 +9,6 @@ from ...utils import utils from ...utils.logger import logger from ..platforms.platform_handlers import get_platform_info -from ..runtime.process_manager import BackgroundService from .stream_manager import LiveStreamRecorder @@ -31,7 +30,6 @@ def __init__(self, app): self.initialize_dynamic_state() max_concurrent = int(self.settings.user_config.get("platform_max_concurrent_requests", 3)) self.platform_semaphores = defaultdict(lambda: asyncio.Semaphore(max_concurrent)) - self.active_recorders = {} @property def recordings(self): @@ -60,7 +58,6 @@ def initialize_dynamic_state(self): for recording in self.recordings: recording.loop_time_seconds = self.loop_time_seconds recording.update_title(self._[recording.quality]) - recording.showed_checking_status = True async def add_recording(self, recording): with GlobalRecordingState.lock: @@ -90,7 +87,7 @@ async def update_recording_card(self, recording: Recording, updated_info: dict): @staticmethod async def _update_recording( - recording: Recording, monitor_status: bool, display_title: str, status_info: str, selected: bool + recording: Recording, monitor_status: bool, display_title: str, status_info: str, selected: bool ): attrs_update = { "monitor_status": monitor_status, @@ -106,22 +103,16 @@ async def start_monitor_recording(self, recording: Recording, auto_save: bool = Start monitoring a single recording if it is not already being monitored. """ if not recording.monitor_status: - recording.is_checking = True - recording.is_live = False - recording.showed_checking_status = False await self._update_recording( recording=recording, monitor_status=True, display_title=recording.title, - status_info=RecordingStatus.STATUS_CHECKING, + status_info=RecordingStatus.MONITORING, selected=False, ) - + self.app.page.run_task(self.check_if_live, recording) self.app.page.run_task(self.app.record_card_manager.update_card, recording) self.app.page.pubsub.send_others_on_topic("update", recording) - - self.app.page.run_task(self.check_if_live, recording) - if auto_save: self.app.page.run_task(self.persist_recordings) @@ -195,206 +186,170 @@ async def check_all_live_status(self): if not recording.detection_time or is_exceeded: self.app.page.run_task(self.check_if_live, recording) - _periodic_task_running = False - - @classmethod - def is_periodic_task_running(cls): - return cls._periodic_task_running - - @classmethod - def set_periodic_task_running(cls, value=True): - cls._periodic_task_running = value - async def setup_periodic_live_check(self, interval: int = 180): """Set up a periodic task to check live status.""" async def periodic_check(): - logger.info("Starting periodic live check background task") while True: - immediate_check_on_startup = self.app.settings.user_config.get("check_live_on_browser_refresh", True) - if immediate_check_on_startup: - await asyncio.sleep(interval) + await asyncio.sleep(interval) await self.check_free_space() if self.app.recording_enabled: await self.check_all_live_status() - if not immediate_check_on_startup: - await asyncio.sleep(interval) - if not RecordingManager.is_periodic_task_running(): - RecordingManager.set_periodic_task_running(True) + if not self.periodic_task_started: self.periodic_task_started = True - logger.info(f"Initializing periodic live check task with interval: {interval}s") - asyncio.create_task(periodic_check()) - else: - logger.info("Periodic live check task already running globally, skipping initialization") + await periodic_check() async def check_if_live(self, recording: Recording): """Check if the live stream is available, fetch stream data and update is_live status.""" - recording.manually_stopped = False - if recording.is_recording or recording.stopping_in_progress: - logger.debug(f"Skip check_if_live because recording is busy: {recording.url}") - return - - if recording.rec_id in self.active_recorders: - logger.debug(f"Skip check_if_live because recorder is active: {recording.url}") + if recording.is_recording: return if not recording.monitor_status: recording.display_title = f"[{self._['monitor_stopped']}] {recording.title}" recording.status_info = RecordingStatus.STOPPED_MONITORING - recording.is_checking = False - self.app.page.run_task(self.app.record_card_manager.update_card, recording) - return - - recording.detection_time = datetime.now().time() - recording.is_checking = True - if not recording.showed_checking_status: + elif not recording.is_checking: recording.status_info = RecordingStatus.STATUS_CHECKING - recording.showed_checking_status = True - self.app.page.run_task(self.app.record_card_manager.update_card, recording) - - if recording.scheduled_recording: - scheduled_time_range_list = await self.get_scheduled_time_range( - recording.scheduled_start_time, recording.monitor_hours) - recording.scheduled_time_range = scheduled_time_range_list - in_scheduled = False - for scheduled_time_range in scheduled_time_range_list: + recording.detection_time = datetime.now().time() + if recording.scheduled_recording and recording.scheduled_start_time and recording.monitor_hours: + scheduled_time_range = await self.get_scheduled_time_range( + recording.scheduled_start_time, recording.monitor_hours) + recording.scheduled_time_range = scheduled_time_range in_scheduled = utils.is_current_time_within_range(scheduled_time_range) - if in_scheduled: - break - - if not in_scheduled: - recording.status_info = RecordingStatus.NOT_IN_SCHEDULED_CHECK - recording.is_live = False - recording.is_checking = False - logger.info(f"Skip Detection: {recording.url} not in scheduled check range {scheduled_time_range_list}") - self.app.page.run_task(self.app.record_card_manager.update_card, recording) - return - - recording.status_info = RecordingStatus.STATUS_CHECKING - platform, platform_key = get_platform_info(recording.url) + if not in_scheduled: + recording.status_info = RecordingStatus.NOT_IN_SCHEDULED_CHECK + recording.is_live = False + logger.info(f"Skip Detection: {recording.url} not in scheduled check range {scheduled_time_range}") + self.app.page.run_task(self.app.record_card_manager.update_card, recording) + return - if platform and platform_key and (recording.platform is None or recording.platform_key is None): - recording.platform = platform - recording.platform_key = platform_key - self.app.page.run_task(self.persist_recordings) + recording.is_checking = True + recording.status_info = RecordingStatus.MONITORING + platform, platform_key = get_platform_info(recording.url) - if self.settings.user_config["language"] != "zh_CN": - platform = platform_key + if platform and platform_key and (recording.platform is None or recording.platform_key is None): + recording.platform = platform + recording.platform_key = platform_key + self.app.page.run_task(self.persist_recordings) - output_dir = self.settings.get_video_save_path() - await self.check_free_space(output_dir) - if not self.app.recording_enabled: - recording.is_checking = False - recording.status_info = RecordingStatus.NOT_RECORDING_SPACE - return - recording_info = { - "platform": platform, - "platform_key": platform_key, - "live_url": recording.url, - "output_dir": output_dir, - "segment_record": recording.segment_record, - "segment_time": recording.segment_time, - "save_format": recording.record_format, - "quality": recording.quality, - } + if self.settings.user_config["language"] != "zh_CN": + platform = platform_key - semaphore = self.platform_semaphores[platform_key] - recorder = LiveStreamRecorder(self.app, recording, recording_info) - async with semaphore: - stream_info = await recorder.fetch_stream() - logger.info(f"Stream Data: {stream_info}") - if not stream_info or not stream_info.anchor_name: - logger.error(f"Fetch stream data failed: {recording.url}") - recording.is_checking = False - recording.status_info = RecordingStatus.LIVE_STATUS_CHECK_ERROR - if recording.monitor_status: - self.app.page.run_task(self.app.record_card_manager.update_card, recording) - self.app.page.pubsub.send_others_on_topic("update", recording) - return - if self.settings.user_config.get("remove_emojis"): - stream_info.anchor_name = utils.clean_name(stream_info.anchor_name, self._["live_room"]) - - if stream_info.is_live: - recording.live_title = stream_info.title - if recording.streamer_name.strip() == self._["live_room"]: - recording.streamer_name = stream_info.anchor_name - recording.title = f"{recording.streamer_name} - {self._[recording.quality]}" - recording.display_title = f"[{self._['is_live']}] {recording.title}" - - if not recording.is_live: - recording.is_live = stream_info.is_live - recording.notified_live_start = False - recording.notified_live_end = False - - if desktop_notify.should_push_notification(self.app): - desktop_notify.send_notification( - title=self._["notify"], - message=recording.streamer_name + ' | ' + self._["live_recording_started_message"], - app_icon=self.app.tray_manager.icon_path + output_dir = self.settings.get_video_save_path() + await self.check_free_space(output_dir) + if not self.app.recording_enabled: + recording.is_checking = False + recording.status_info = RecordingStatus.NOT_RECORDING_SPACE + return + recording_info = { + "platform": platform, + "platform_key": platform_key, + "live_url": recording.url, + "output_dir": output_dir, + "segment_record": recording.segment_record, + "segment_time": recording.segment_time, + "save_format": recording.record_format, + "quality": recording.quality, + } + + semaphore = self.platform_semaphores[platform_key] + recorder = LiveStreamRecorder(self.app, recording, recording_info) + async with semaphore: + stream_info = await recorder.fetch_stream() + logger.info(f"Stream Data: {stream_info}") + if not stream_info or not stream_info.anchor_name: + logger.error(f"Fetch stream data failed: {recording.url}") + recording.is_checking = False + recording.status_info = RecordingStatus.LIVE_STATUS_CHECK_ERROR + if recording.monitor_status: + self.app.page.run_task(self.app.record_card_manager.update_card, recording) + return + if self.settings.user_config.get("remove_emojis"): + stream_info.anchor_name = utils.clean_name(stream_info.anchor_name, self._["live_room"]) + + if stream_info.is_live: + recording.live_title = stream_info.title + if recording.streamer_name.strip() == self._["live_room"]: + recording.streamer_name = stream_info.anchor_name + recording.title = f"{recording.streamer_name} - {self._[recording.quality]}" + recording.display_title = f"[{self._['is_live']}] {recording.title}" + + if not recording.is_live: + recording.is_live = stream_info.is_live + recording.notified_live_start = False + recording.notified_live_end = False + + if desktop_notify.should_push_notification(self.app): + desktop_notify.send_notification( + title=self._["notify"], + message=recording.streamer_name + ' | ' + self._["live_recording_started_message"], + app_icon=self.app.tray_manager.icon_path + ) + + msg_manager = message_pusher.MessagePusher(self.settings) + user_config = self.settings.user_config + if (msg_manager.should_push_message(self.settings, recording, message_type='start') + and not recording.notified_live_start): + push_content = self._["push_content"] + begin_push_message_text = user_config.get("custom_stream_start_content") + if begin_push_message_text: + push_content = begin_push_message_text + + push_at = datetime.today().strftime("%Y-%m-%d %H:%M:%S") + push_content = push_content.replace("[room_name]", recording.streamer_name).replace( + "[time]", push_at ) + msg_title = user_config.get("custom_notification_title").strip() + msg_title = msg_title or self._["status_notify"] + + self.app.page.run_task( + msg_manager.push_messages, + msg_title, + push_content, + "start", + {"room_name": recording.streamer_name, "url": recording.url}, + ) + recording.notified_live_start = True - msg_manager = message_pusher.MessagePusher(self.settings) - user_config = self.settings.user_config - if (msg_manager.should_push_message(self.settings, recording, message_type='start') - and not recording.notified_live_start): - push_content = self._["push_content"] - begin_push_message_text = user_config.get("custom_stream_start_content") - if begin_push_message_text: - push_content = begin_push_message_text - - push_at = datetime.today().strftime("%Y-%m-%d %H:%M:%S") - push_content = push_content.replace("[room_name]", recording.streamer_name).replace( - "[time]", push_at).replace("[title]", recording.live_title or "None") - msg_title = user_config.get("custom_notification_title").strip() - msg_title = msg_title or self._["status_notify"] - - BackgroundService.get_instance().add_task( - msg_manager.push_messages_sync, msg_title, push_content - ) - recording.notified_live_start = True - - if not recording.only_notify_no_record: - recording.status_info = RecordingStatus.PREPARING_RECORDING - recording.loop_time_seconds = self.loop_time_seconds - self.start_update(recording) - self.app.page.run_task(recorder.start_recording, stream_info) - else: - if recording.notified_live_start: - notify_loop_time = user_config.get("notify_loop_time") - recording.loop_time_seconds = int(notify_loop_time or 600) - else: + if not recording.only_notify_no_record: + recording.status_info = RecordingStatus.PREPARING_RECORDING recording.loop_time_seconds = self.loop_time_seconds + self.start_update(recording) + self.app.page.run_task(recorder.start_recording, stream_info) + else: + if recording.notified_live_start: + notify_loop_time = user_config.get("notify_loop_time") + recording.loop_time_seconds = int(notify_loop_time or 600) + else: + recording.loop_time_seconds = self.loop_time_seconds - recording.cumulative_duration = timedelta() - recording.last_duration = timedelta() - recording.status_info = RecordingStatus.LIVE_BROADCASTING + recording.cumulative_duration = timedelta() + recording.last_duration = timedelta() + recording.status_info = RecordingStatus.LIVE_BROADCASTING - else: - recording.is_recording = False - if recording.is_live: - recording.is_live = False - self.app.page.run_task(recorder.end_message_push) - - recording.status_info = RecordingStatus.MONITORING - title = f"{stream_info.anchor_name or recording.streamer_name} - {self._[recording.quality]}" - if recording.streamer_name == self._["live_room"] or \ - f"[{self._['is_live']}]" in recording.display_title: - recording.update( - { - "streamer_name": stream_info.anchor_name, - "title": title, - "display_title": title, - } - ) - self.app.page.run_task(self.persist_recordings) + else: + if recording.is_live: + recording.is_live = False + self.app.page.run_task(recorder.end_message_push) + + recording.status_info = RecordingStatus.MONITORING + title = f"{stream_info.anchor_name or recording.streamer_name} - {self._[recording.quality]}" + if recording.streamer_name == self._["live_room"] or \ + f"[{self._['is_live']}]" in recording.display_title: + recording.update( + { + "streamer_name": stream_info.anchor_name, + "title": title, + "display_title": title, + } + ) + self.app.page.run_task(self.persist_recordings) - recording.is_checking = False - self.app.page.run_task(self.app.record_card_manager.update_card, recording) - self.app.page.pubsub.send_others_on_topic("update", recording) - return + self.app.page.run_task(self.app.record_card_manager.update_card, recording) + self.app.page.pubsub.send_others_on_topic("update", recording) + recording.is_checking = False @staticmethod def start_update(recording: Recording): @@ -411,26 +366,11 @@ def start_update(recording: Recording): ) logger.info(f"Started recording for {recording.title}") - def stop_recording(self, recording: Recording, manually_stopped: bool = True): + @staticmethod + def stop_recording(recording: Recording, manually_stopped: bool = True): """Stop the recording process.""" recording.is_live = False if recording.is_recording: - - recording.stopping_in_progress = True - - logger.info(f"Trying to stop recorder for {recording.rec_id}, title: {recording.title}") - logger.debug(f"Active recorders: {list(self.active_recorders.keys())}") - - if recording.rec_id in self.active_recorders: - recorder = self.active_recorders[recording.rec_id] - logger.debug(f"Found recorder instance - id: {id(recorder)}") - recorder.request_stop() - logger.info(f"Requested stop for recorder: {recording.rec_id}") - else: - logger.warning(f"No active recorder found for {recording.rec_id}, cannot request stop") - recording.force_stop = True - logger.info(f"Set force_stop=True for recording: {recording.rec_id}") - if recording.start_time is not None: elapsed = datetime.now() - recording.start_time # Add the elapsed time to the cumulative duration. @@ -440,11 +380,8 @@ def stop_recording(self, recording: Recording, manually_stopped: bool = True): recording.start_time = None recording.is_recording = False recording.manually_stopped = manually_stopped - recording.status_info = RecordingStatus.NOT_RECORDING logger.info(f"Stopped recording for {recording.title}") - self.app.page.run_task(self._reset_stopping_flag, recording) - def get_duration(self, recording: Recording): """Get the duration of the current recording session in a formatted string.""" if recording.is_recording and recording.start_time is not None: @@ -487,20 +424,9 @@ async def check_free_space(self, output_dir: str | None = None): self.app.recording_enabled = True @staticmethod - async def get_scheduled_time_range(scheduled_start_time, monitor_hours) -> list | None: - scheduled_time_range_list = [] - for index, start_time in enumerate(scheduled_start_time.split(',')): - try: - hours = str(monitor_hours).split(',')[index] - if start_time and hours: - end_time = utils.add_hours_to_time(start_time, float(hours or 5)) - scheduled_time_range = f"{start_time}~{end_time}" - scheduled_time_range_list.append(scheduled_time_range) - except Exception: - pass - return scheduled_time_range_list - - @staticmethod - async def _reset_stopping_flag(recording: Recording): - recording.stopping_in_progress = False - logger.debug(f"Reset stopping_in_progress flag for recording: {recording.rec_id}") + async def get_scheduled_time_range(scheduled_start_time, monitor_hours) -> str | None: + monitor_hours = float(monitor_hours or 5) + if scheduled_start_time and monitor_hours: + end_time = utils.add_hours_to_time(scheduled_start_time, monitor_hours) + scheduled_time_range = f"{scheduled_start_time}~{end_time}" + return scheduled_time_range