Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 95 additions & 31 deletions app/messages/message_pusher.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
from typing import Optional

from ..models.recording.recording_model import Recording
from ..ui.views.settings_view import SettingsPage
from ..utils.logger import logger
from .notification_service import NotificationService
from datetime import datetime


class MessagePusher:
Expand All @@ -16,30 +16,27 @@ def _get_proxy(self) -> str | None:
if self.settings.user_config.get("enable_proxy"):
return self.settings.user_config.get("proxy_address")

@staticmethod
def _get_push_channels() -> list[str]:
return [
def is_any_push_channel_enabled(self) -> bool:
"""Check if any push channel is enabled"""
push_channels = [
"dingtalk_enabled",
"wechat_enabled",
"feishu_enabled",
"bark_enabled",
"ntfy_enabled",
"telegram_enabled",
"feishu_enabled",
"email_enabled",
"serverchan_enabled",
"serverchan_enabled"
]

def is_any_push_channel_enabled(self) -> bool:
"""Check if any push channel is enabled"""
push_channels = self._get_push_channels()

return any(self.settings.user_config.get(channel) for channel in push_channels)

@staticmethod
def should_push_message(
settings: SettingsPage,
recording: Recording,
check_manually_stopped: bool = False,
message_type: Optional[str] = None
settings: SettingsPage,
recording: Recording,
check_manually_stopped: bool = False,
message_type: Optional[str] = None
) -> bool:
"""
Check if message should be pushed
Expand All @@ -51,7 +48,7 @@ def should_push_message(
should_only_notify_no_record = user_config.get("only_notify_no_record")
is_stream_start_enabled = user_config.get("stream_start_notification_enabled")
is_stream_end_enabled = user_config.get("stream_end_notification_enabled")

if message_type is None:
if hasattr(recording, 'is_recording') and recording.is_recording:
message_type = 'end'
Expand All @@ -67,7 +64,17 @@ def should_push_message(
if message_type == 'end' and not is_stream_end_enabled:
return False

push_channels = MessagePusher._get_push_channels()
push_channels = [
"dingtalk_enabled",
"wechat_enabled",
"bark_enabled",
"ntfy_enabled",
"telegram_enabled",
"feishu_enabled",
"email_enabled",
"serverchan_enabled"
]

any_channel_enabled = any(user_config.get(channel) for channel in push_channels)

if not any_channel_enabled:
Expand All @@ -84,16 +91,54 @@ def log_push_result(service_name: str, result: dict) -> None:
if result.get("error") or (not result.get("success") and not result.get("error")):
logger.error(f"Push {service_name} message failed: {result['error']}")

def push_messages_sync(self, msg_title: str, push_content: str) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.push_messages(msg_title, push_content))
finally:
loop.close()
async def _append_notification_history(
self,
msg_title: str,
push_content: str,
message_type: str | None,
meta: dict | None,
results: dict,
) -> None:
if self.settings.user_config.get("notification_history_enabled") is False:
return

history = self.settings.user_config.get("notification_history")
if not isinstance(history, list):
history = []

async def push_messages(self, msg_title: str, push_content: str) -> None:
try:
max_entries = int(self.settings.user_config.get("notification_history_max_entries") or 200)
except (TypeError, ValueError):
max_entries = 200

entry = {
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"type": message_type or "unknown",
"title": msg_title,
"content": push_content,
"meta": meta or {},
"results": results,
}

history.insert(0, entry)
if max_entries > 0:
history = history[:max_entries]

self.settings.user_config["notification_history"] = history
try:
await self.settings.config_manager.save_user_config(self.settings.user_config)
except Exception as e:
logger.debug(f"Save notification history failed: {e}")

async def push_messages(
self,
msg_title: str,
push_content: str,
message_type: str | None = None,
meta: dict | None = None,
) -> None:
"""Push messages to all enabled notification services"""
results: dict[str, dict] = {}
if self.settings.user_config.get("dingtalk_enabled"):
result = await self.notifier.send_to_dingtalk(
url=self.settings.user_config.get("dingtalk_webhook_url"),
Expand All @@ -102,12 +147,14 @@ async def push_messages(self, msg_title: str, push_content: str) -> None:
is_atall=self.settings.user_config.get("dingtalk_at_all"),
)
self.log_push_result("DingTalk", result)
results["DingTalk"] = result

if self.settings.user_config.get("wechat_enabled"):
result = await self.notifier.send_to_wechat(
url=self.settings.user_config.get("wechat_webhook_url"), title=msg_title, content=push_content
)
self.log_push_result("Wechat", result)
results["Wechat"] = result

if self.settings.user_config.get("bark_enabled"):
result = await self.notifier.send_to_bark(
Expand All @@ -118,6 +165,7 @@ async def push_messages(self, msg_title: str, push_content: str) -> None:
sound=self.settings.user_config.get("bark_sound"),
)
self.log_push_result("Bark", result)
results["Bark"] = result

if self.settings.user_config.get("ntfy_enabled"):
result = await self.notifier.send_to_ntfy(
Expand All @@ -129,6 +177,7 @@ async def push_messages(self, msg_title: str, push_content: str) -> None:
email=self.settings.user_config.get("ntfy_email"),
)
self.log_push_result("Ntfy", result)
results["Ntfy"] = result

if self.settings.user_config.get("telegram_enabled"):
result = await self.notifier.send_to_telegram(
Expand All @@ -138,6 +187,18 @@ async def push_messages(self, msg_title: str, push_content: str) -> None:
proxy=self._get_proxy(),
)
self.log_push_result("Telegram", result)
results["Telegram"] = result

if self.settings.user_config.get("feishu_enabled"):
result = await self.notifier.send_to_feishu(
url=self.settings.user_config.get("feishu_webhook_url"),
title=msg_title,
content=push_content,
msg_type=self.settings.user_config.get("feishu_msg_type", "text"),
sign_secret=self.settings.user_config.get("feishu_sign_secret"),
)
self.log_push_result("Feishu", result)
results["Feishu"] = result

if self.settings.user_config.get("email_enabled"):
result = await self.notifier.send_to_email(
Expand All @@ -151,6 +212,7 @@ async def push_messages(self, msg_title: str, push_content: str) -> None:
content=push_content,
)
self.log_push_result("Email", result)
results["Email"] = result

if self.settings.user_config.get("serverchan_enabled"):
result = await self.notifier.send_to_serverchan(
Expand All @@ -161,10 +223,12 @@ async def push_messages(self, msg_title: str, push_content: str) -> None:
tags=self.settings.user_config.get("serverchan_tags", "Live Status Update"),
)
self.log_push_result("ServerChan", result)

if self.settings.user_config.get("feishu_enabled"):
result = await self.notifier.send_to_feishu(
url=self.settings.user_config.get("feishu_webhook_url"),
content=push_content,
)
self.log_push_result("Feishu", result)
results["ServerChan"] = result

await self._append_notification_history(
msg_title=msg_title,
push_content=push_content,
message_type=message_type,
meta=meta,
results=results,
)
Loading