This repository was archived by the owner on Feb 22, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathweda.py
256 lines (230 loc) · 8.82 KB
/
weda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# -*- coding: utf-8 -*-
import os
import base64
import requests
import pytz
import datetime
import json
import fcntl
from config import ALL_RULE_FILENAME
from config import WEDA_ENV
from config import WEDA_USER_DATASOURCE
TENCENT_CLOUD_SECRET_ID = os.environ.get("TENCENT_CLOUD_SECRET_ID")
TENCENT_CLOUD_SECRET_KEY = os.environ.get("TENCENT_CLOUD_SECRET_KEY")
def get_access_token():
"""
获取access_token
"""
access_token = os.environ.get("WEDA_ACCESS_TOKEN")
if access_token:
pass
else:
# 换取AccessToken
token_url = f"https://lowcode-8gsauxtn5fe06776.ap-shanghai.tcb-api.tencentcloudapi.com" \
f"/auth/v1/token/clientCredential"
token_headers = {
"Content-Type": "application/json",
"Authorization":
f"Basic {base64.b64encode(f'{TENCENT_CLOUD_SECRET_ID}:{TENCENT_CLOUD_SECRET_KEY}'.encode()).decode()}"
}
token_data = {
"grant_type": "client_credentials"
}
# print(token_headers)
# print(token_url)
token_response = requests.post(token_url, headers=token_headers, json=token_data)
access_token = token_response.json()["access_token"]
print(f"access_token: {access_token}")
os.environ["WEDA_ACCESS_TOKEN"] = access_token
return access_token
def query_data_by_filter(env_type: str, datasource_name: str, filter_str: str = None):
"""
根据filter查询数据源记录
:param env_type:
:param datasource_name:
:param filter_str:
:return:
"""
access_token = get_access_token()
# 查询数据
if filter_str:
query_url = f"https://lowcode-8gsauxtn5fe06776.ap-shanghai.tcb-api.tencentcloudapi.com/weda/odata/v1/batch" \
f"/{env_type}/{datasource_name}?$filter={filter_str}"
else:
query_url = f"https://lowcode-8gsauxtn5fe06776.ap-shanghai.tcb-api.tencentcloudapi.com/weda/odata/v1/batch" \
f"/{env_type}/{datasource_name}?$orderby=updateBy desc"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
}
# 初始化参数
params = {
"$top": 1000, # 每次查询的记录数
"$skip": 0, # 跳过的记录数
"$count": "true" # 获取数据总数
}
# 存储所有数据的列表
all_data = []
while True:
# 发送GET请求
response = requests.get(query_url, headers=headers, params=params)
# 将响应内容解析为JSON
data = response.json()
# 将本次查询的数据添加到总数据列表中
all_data.extend(data['value'])
# 如果本次查询的数据少于1000条,说明已经查询完所有数据,跳出循环
if len(data['value']) < 1000:
break
# 否则,更新$skip参数,准备查询下一页数据
params['$skip'] += 1000
return all_data
def get_active_rule_list(cd_index: int):
"""
从微搭数据源获取用户的推送规则
:parm: cd_index 场地代号,映射见全局变量
:return:
"""
rule_list = []
print("getting all rules from local file...")
# 读取文件时加读锁
with open(ALL_RULE_FILENAME, 'r') as f:
fcntl.flock(f, fcntl.LOCK_SH)
data = f.read()
fcntl.flock(f, fcntl.LOCK_UN)
# 将JSON格式的字符串转换为列表
all_rule_list = json.loads(data)
for rule in all_rule_list:
if rule['status'] == '2' and rule['xjcd'] == str(cd_index):
rule_list.append(rule)
return rule_list
def get_all_rule_list(use_cache: bool = True):
"""
从微搭数据源获取用户的推送规则
:parm: cd_index 场地代号,映射见全局变量
:return:
"""
print(f"getting all rule")
if use_cache:
print("get from local file...")
# 读取文件时加读锁
with open(ALL_RULE_FILENAME, 'r') as f:
fcntl.flock(f, fcntl.LOCK_SH)
data = f.read()
fcntl.flock(f, fcntl.LOCK_UN)
# 将JSON格式的字符串转换为列表
all_rule_list = json.loads(data)
return all_rule_list
else:
beijing_tz = pytz.timezone('Asia/Shanghai') # 北京时区
filter_rule_list = []
rule_list = query_data_by_filter(WEDA_ENV, WEDA_USER_DATASOURCE)
for rule in rule_list:
# print(rule)
# 转换时间格式
start_date = datetime.datetime.fromtimestamp(rule['start_date']/1000, beijing_tz).strftime("%Y-%m-%d")
end_date = datetime.datetime.fromtimestamp(rule['end_date'] / 1000, beijing_tz).strftime("%Y-%m-%d")
hours = rule['start_time'] // (1000 * 60 * 60)
minutes = (rule['start_time'] // (1000 * 60)) % 60
seconds = (rule['start_time'] // 1000) % 60
start_time = datetime.time(hour=int(hours), minute=int(minutes), second=int(seconds)).strftime('%H:%M')
hours = rule['end_time'] // (1000 * 60 * 60)
minutes = (rule['end_time'] // (1000 * 60)) % 60
seconds = (rule['end_time'] // 1000) % 60
end_time = datetime.time(hour=int(hours), minute=int(minutes), second=int(seconds)).strftime('%H:%M')
timestamp = rule['createdAt'] / 1000 # 将毫秒转换为秒
created_time = datetime.datetime.fromtimestamp(timestamp, datetime.timezone(datetime.timedelta(hours=8)))
created_time_str = created_time.strftime('%Y-%m-%d %H:%M:%S')
# 对日期和时间进行转义
rule['start_date'] = start_date
rule['end_date'] = end_date
rule['start_time'] = start_time
rule['end_time'] = end_time
rule['createdAt'] = created_time_str
# 转义后的订阅
filter_rule_list.append(rule)
# print(f"filter_rule_list: {filter_rule_list}")
print(f"all_rule_list: {len(filter_rule_list)}")
return filter_rule_list
def get_vip_user_list():
"""
从微搭数据源获取付费用户列表
:return:
"""
rule_list = query_data_by_filter(WEDA_ENV, "svip_user_flj50jl")
return rule_list
def update_record_info_by_id(record_id: str, payload: dict = None):
"""
更新数据源记录
"""
access_token = get_access_token()
# 更新数据
query_url = f"https://lowcode-8gsauxtn5fe06776.ap-shanghai.tcb-api.tencentcloudapi.com/weda/odata/v1/batch" \
f"/{WEDA_ENV}/{WEDA_USER_DATASOURCE}('{record_id}')"
query_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
}
print(query_url)
# print(payload)
query_response = requests.patch(query_url, headers=query_headers, json=payload, timeout=5)
print(query_response.status_code)
# print(query_response.text)
if query_response.status_code == 200:
if query_response.json().get('updateCount') and query_response.json().get('updateCount') > 0:
return query_response.json()['updateCount']
else:
raise Exception(query_response.text)
else:
raise Exception(query_response.text)
def create_record(payload: dict):
"""
更新数据源记录
"""
# 换取AccessToken
access_token = get_access_token()
# 更新数据
query_url = f"https://lowcode-8gsauxtn5fe06776.ap-shanghai.tcb-api.tencentcloudapi.com/weda/odata/v1" \
f"/{WEDA_ENV}/sms_38mzmt2"
query_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
}
print(query_url)
print(payload)
query_response = requests.post(query_url, headers=query_headers, json=payload)
print(query_response.status_code)
print(query_response.text)
if query_response.status_code == 201:
return query_response.json()
else:
raise Exception(query_response.text)
def get_today_active_rule_list(use_cache: bool = True):
"""
从微搭数据源获取用户的推送规则
:parm: cd_index 场地代号,映射见全局变量
:return:
"""
print(f"get_today_active_rule_list")
if use_cache:
print("get from local file...")
# 读取文件时加读锁
with open(ALL_RULE_FILENAME, 'r') as f:
fcntl.flock(f, fcntl.LOCK_SH)
data = f.read()
fcntl.flock(f, fcntl.LOCK_UN)
# 将JSON格式的字符串转换为列表
all_rule_list = json.loads(data)
rule_list = []
for rule in all_rule_list:
if rule.get('jrtzcs') and rule['jrtzcs'] > 0:
rule_list.append(rule)
else:
pass
else:
print("get from remote...")
rule_list = query_data_by_filter(WEDA_ENV, WEDA_USER_DATASOURCE, f"(jrtzcs gt 0)")
print(f"rule_list: {len(rule_list)}")
return rule_list
# testing
if __name__ == '__main__':
get_access_token()