-
Notifications
You must be signed in to change notification settings - Fork 652
/
Copy pathslack_alert.py
79 lines (65 loc) · 2.21 KB
/
slack_alert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""
Slack notification module for sending messages to Slack channels.
"""
import logging
from typing import Optional
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
logger = logging.getLogger(__name__)
# Global variables for Slack configuration
_slack_token: Optional[str] = None
_slack_channel: Optional[str] = None
_slack_client: Optional[WebClient] = None
def init_slack(token: str, channel: str) -> None:
"""
Initialize Slack configuration.
Args:
token: Slack bot token
channel: Default Slack channel ID or name
Raises:
ValueError: If token or channel is empty
"""
global _slack_token, _slack_channel, _slack_client
_slack_token = token
_slack_channel = channel
_slack_client = WebClient(token=token)
def send_slack_message(
message: str,
blocks: Optional[list] = None,
attachments: Optional[list] = None,
thread_ts: Optional[str] = None,
channel: Optional[str] = None,
):
"""
Send a message to a Slack channel.
Args:
message: The message text to send
blocks: Optional blocks for rich message formatting (see Slack Block Kit)
attachments: Optional attachments for the message
thread_ts: Optional thread timestamp to reply to a thread
channel: Optional channel override. If not provided, uses the default channel
Raises:
RuntimeError: If slack is not initialized
SlackApiError: If the message fails to send
"""
if not _slack_client or not _slack_channel:
# Write the input message to the log and return
logger.info("Slack not initialized")
logger.info(message)
if blocks:
logger.info(blocks)
if attachments:
logger.info(attachments)
return
try:
response = _slack_client.chat_postMessage(
channel=channel or _slack_channel,
text=message,
blocks=blocks,
attachments=attachments,
thread_ts=thread_ts,
)
logger.info(f"Message sent successfully to channel {channel or _slack_channel}")
return response
except SlackApiError as e:
logger.error(f"Failed to send Slack message: {str(e)}")