-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.py
1495 lines (1306 loc) · 76.8 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/bin/python
import faulthandler
faulthandler.enable()
import asyncio
import datetime
import os
import random
import re
import base64
import urllib.parse
import emoji
import time
import traceback
from openai import OpenAI
import requests, aiohttp
from Hyper import Configurator
import platform
import psutil
import GPUtil
import subprocess
from typing import Set
from PIL import Image
import io
import threading
import paramiko
# import framework
Configurator.cm = Configurator.ConfigManager(Configurator.Config(file="config.json").load_from_file())
bot_name = Configurator.cm.get_cfg().others["bot_name"] #星·简
bot_name_en = Configurator.cm.get_cfg().others["bot_name_en"] #Shining girl
from Hyper import Listener, Events, Logger, Manager, Segments
from Hyper.Utils import Logic
from Hyper.Events import *
#import moudles
from GoogleAI import genai, Context, Parts, Roles
# from google.generativeai.types import FunctonDeclaration
from SearchOnline import network_gpt as SearchOnline
from prerequisites import prerequisite
import Quote
config = Configurator.cm.get_cfg()
logger = Logger.Logger()
logger.set_level(config.log_level)
version_name = "2.0"
cooldowns = {}
cooldowns1 = {}
second_start = time.time()
EnableNetwork = "Pixmap"
user_lists = {}
in_timing = False
emoji_send_count: datetime = None
generating = False
class Tools:
pass
generation_config = {
"temperature": 1,
"top_p": 0.95,
"top_k": 64,
"max_output_tokens": 8192,
"response_mime_type": "text/plain",
}
sys_prompt = f''''''
model = genai.GenerativeModel()
key = Configurator.cm.get_cfg().others["gemini_key"]
reminder: str = Configurator.cm.get_cfg().others["reminder"]
genai.configure(api_key=key)
tools = []
ROOT_User: list = Configurator.cm.get_cfg().others["ROOT_User"]
Super_User: list = []
Manage_User: list = []
sisters: list = []
jhq: list = []
def load_blacklist():
try:
with open("blacklist.sr", "r", encoding="utf-8") as f:
blacklist115 = set(line.strip() for line in f) # 使用集合方便快速查找,不然容易溶血
return blacklist115
except FileNotFoundError:
return set()
class ContextManager:
def __init__(self):
self.groups: dict[int, dict[int, Context]] = {}
def get_context(self, uin: int, gid: int):
try:
return self.groups[gid][uin]
except KeyError:
if self.groups.get(gid):
self.groups[gid][uin] = Context(key, model, tools=tools)
return self.groups[gid][uin]
else:
self.groups[gid] = {}
self.groups[gid][uin] = Context(key, model, tools=tools)
return self.groups[gid][uin]
cmc = ContextManager()
def has_emoji(s: str) -> bool:
# 判断找到的 emoji 数量是否为 1 并且字符串的长度大于等于 1
return emoji.emoji_count(s) == 1 and len(s) == 1
def timing_message(actions: Listener.Actions):
while True:
echo = asyncio.run(actions.custom.get_group_list())
result = Manager.Ret.fetch(echo)
with open("timing_message.ini", "r", encoding="utf-8") as f:
send_time = f.read()
send_time = send_time.split("\n")
send_time = send_time[0].split("⊕")
print(send_time)
now = datetime.datetime.now()
print(f"now {now.hour:02}:{now.minute:02}")
if f"{now.hour:02}:{now.minute:02}" == send_time[0]:
print("send timing messages")
blacklist = load_blacklist() # 在发送消息前加载黑名单,防止返回一个sb空集合
for group in result.data.raw:
group_id = str(group['group_id']) # 将group_id转为字符串类型,不然来个error会溶血
if group_id not in blacklist: # 检查群组 ID 是否在黑名单中,在就别给lz发
asyncio.run(actions.send(group_id=group['group_id'], message=Manager.Message(Segments.Text(send_time[1]))))
time.sleep(random.random()*3)
else:
print(f"群聊{group_id} TM在黑名单,发NM555")
time.sleep(60 - now.second)
def Read_Settings():
global Super_User, Manage_User, sisters, jhq
with open("Super_User.ini", "r") as f:
Super_User = f.read().split("\n")
f.close()
with open("Manage_User.ini", "r") as f:
Manage_User = f.read().split("\n")
f.close()
with open("sisters.ini", "r") as f:
sisters = f.read().split("\n")
f.close()
with open("jhq.ini", "r") as f:
jhq = f.read().split("\n")
f.close()
def Write_Settings(s: list, m: list) -> bool:
s = [item for item in s if item]
m = [item for item in m if item]
global Super_User, Manage_User
su = ""
for item in range(len(s)):
su += s[item]
if item != len(s) - 1:
su += "\n"
ma = ""
for item in range(len(m)):
ma += m[item]
if item != len(m) - 1:
ma += "\n"
try:
with open("Super_User.ini", "w") as f:
f.write(su)
f.close()
with open("Manage_User.ini", "w") as f:
f.write(ma)
f.close()
Super_User = s
Manage_User = m
return True
except:
return False
@Listener.reg
@Logic.ErrorHandler().handle_async
async def handler(event: Events.Event, actions: Listener.Actions) -> None:
global in_timing, bot_name, bot_name_en, reminder
if not in_timing:
Read_Settings()
in_timing = True
thread = threading.Thread(target=timing_message, args=(actions,))
thread.start()
if isinstance(event, Events.HyperListenerStartNotify):
if os.path.exists("restart.temp"):
with open("restart.temp", "r" ,encoding="utf-7") as f:
group_id = f.read()
f.close()
os.remove("restart.temp")
await actions.send(group_id=group_id, message=Manager.Message(Segments.Text(f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Welcome! {bot_name} was restarted successfully. Now you can send {reminder}帮助 to know more.''')))
if isinstance(event, Events.GroupMemberIncreaseEvent):
user = event.user_id
welcome = f''' 加入{bot_name}的大家庭,{bot_name}是你最忠实可爱的女朋友噢o(*≧▽≦)ツ
随时和{bot_name}交流,你只需要在问题的前面加上 {reminder} 就可以啦!( •̀ ω •́ )✧
{bot_name}是你最二次元的好朋友,经常@{bot_name} 看看{bot_name}又学会做什么新事情啦~o((>ω< ))o
祝你在{bot_name}的大家庭里生活愉快!♪(≧∀≦)ゞ☆'''
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Image(f"http://q2.qlogo.cn/headimg_dl?dst_uin={user}&spec=640"), Segments.Text("欢迎"), Segments.At(user), Segments.Text(welcome)))
if isinstance(event, Events.GroupAddInviteEvent):
keywords: list = Configurator.cm.get_cfg().others["Auto_approval"]
cleaned_text = event.comment.strip().lower()
for keyword6 in keywords:
processed_keyword = keyword6.strip().lower()
all_chars_present = True
for char in processed_keyword:
if char not in cleaned_text:
all_chars_present = False
break
if all_chars_present:
await actions.set_group_add_request(flag=event.flag, sub_type=event.sub_type, approve=True, reason="")
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"用户 {event.user_id} 的答案正确,已自动批准,题目数据为 {event.comment} ")))
break
def execute_command(command):
try:
result = subprocess.run(command, capture_output=True, text=True, check=True, shell=True)
# capture_output=True 捕获输出(stdout/stderr)
# text=True 解码为文本字符串,可以返回text
# check=True 当返回非零退出码时引发 CalledProcessError 异常,开不开差不多()
# shell=True 允许使用 shell 的特性,不建议开,不然容易溶血
return {
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode,
}
except subprocess.CalledProcessError as e:
return {
"stdout": e.stdout,
"stderr": e.stderr,
"returncode": e.returncode
}
except Exception as e:
return {
"stdout": None,
"stderr": str(e),
"returncode": -1
}
if isinstance(event, Events.GroupMessageEvent):
user_message = str(event.message)
order = ""
global user_lists
global sys_prompt
global second_start
global EnableNetwork
global generating
global Super_User, Manage_User, ROOT_User, sisters,jhq
global model
event_user = (await actions.get_stranger_info(event.user_id)).data.raw
event_user = event_user['nickname']
print(event_user)
# match str(event.message):
# case "ping":
# await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("pong")))
# case "/生图 Pixiv":
# await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Image("https://pixiv.t.sr-studio.top/img-original/img/2023/01/24/03/53/38/104766095_p0.png")))
print(event.user_id)
if str(event.user_id) in jhq:
print("My Kids")
sys_prompt = prerequisite(bot_name, event_user).mother()
else:
if str(event.user_id) in sisters:
print("My little sister")
sys_prompt = prerequisite(bot_name, event_user).sister()
else:
sys_prompt = prerequisite(bot_name, event_user).girl_friend()
if "ping" == user_message:
print(str(event.user_id))
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("pong! 爆炸!v(◦'ωˉ◦)~♡ ")))
elif f"{bot_name}真棒" in user_message:
i = random.randint(1,3)
match i:
case 1:
m = "啊!老……老公,别怎么说啦,人……人家好害羞的啦,人家还会努力的(*ᴗ͈ˬᴗ͈)ꕤ*.゚"
case 2:
m = "啊~老公~你不要这么夸人家啦~〃∀〃"
case 3:
m = "唔……谢……谢谢老公啦🥰~"
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(m)))
# not_allowed_word = ["小塑塑真棒", "小塑塑棒不棒"]
# for item in not_allowed_word:
# contains = []
# for p in range(len(item)):
# if item[p] in user_message:
# contains.append("1")
# if len(contains) >= len(item):
# try:
# await actions.del_message(event.message_id)
# except:
# pass
# break
global emoji_send_count
if has_emoji(user_message):
if emoji_send_count is None or datetime.datetime.now() - emoji_send_count > datetime.timedelta(seconds=15):
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(user_message)))
emoji_send_count = datetime.datetime.now()
else:
print(f"emoji +1 延迟 {abs(datetime.datetime.now() - emoji_send_count)} s")
if user_message.startswith(reminder):
order_i = user_message.find(reminder)
if order_i != -1:
order = user_message[order_i + len(reminder):].strip()
print("收到命令 " + order)
elif user_message.startswith(reminder):
order_i = user_message.find(reminder)
if order_i != -1:
order = user_message[order_i + len(reminder):].strip()
print("收到命令 " + order)
if f"{reminder}重启" == user_message:
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User or str(event.user_id) in Manage_User:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"Restarting in progress……")))
try:
with open("restart.temp", "w" ,encoding="utf-7") as f:
f.write(str(event.group_id))
f.close()
except:
pass
Listener.restart()
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱")))
elif "runcommand " in order:
blacklist_file = "blacklist.sr"
if str(event.user_id) in Manage_User or str(event.user_id) in Super_User or str(event.user_id) in ROOT_User:
order = order.removeprefix("runcommand").strip()
order_lower = order.lower()
print(order_lower)
# 定义危险命令
dangerous_commands = ["rm", "vi", "vim", "tsab", "del", "rmdir", "format", "shutdown", "shutdown.exe"]
# 检查危险命令
if any(dangerous_cmd in order_lower for dangerous_cmd in dangerous_commands):
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("❌ ERROR 危险命令,已屏蔽。\nℹ️ INFO None.")))
return
match order_lower:
case cmd if re.match(r"^scheduled sends.*", cmd):
print("使用命令定时")
try:
send_time = order_lower[order_lower.find("scheduled sends ") + len("scheduled sends "):].strip()
if not re.match(r'^([01][0-9]|2[0-3]):([0-5][0-9])$', send_time[:5]):
r = f'''命令执行结果:
❌ERROR {bot_name}不能识别给定的时间是什么 Σ( ° △ °|||)︴
ℹ️ INFO 举个🌰子:{reminder}runcommand scheduled sends 00:00 早安 —> 即可让{bot_name}在0点0分准时问候早安噢⌯oᴗo⌯'''
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(r)))
else:
timing_settings = f"{send_time[:5]}⊕{send_time[6::]}"
with open("timing_message.ini", "w", encoding="utf-8") as f:
f.write(timing_settings)
r = f'''命令执行结果:
ℹ️ INFO {bot_name}设置成功!(*≧▽≦) '''
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(r)))
except Exception as e:
r = f'''命令执行结果:
❌ERROR {str(type(e))}
❌ERROR {bot_name}设置失败了…… (╥﹏╥)'''
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(r)))
case "restart":
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"""命令执行结果:
⚠️ WARN 正在退出(Ctrl+C)
ℹ️ INFO 重新启动监听器....""")))
try:
with open("restart.temp", "w", encoding="utf-7") as f:
f.write(str(event.group_id))
except Exception as e:
print(f"Error saving restart info: {e}")
Listener.restart()
case "message clear":
global cmc
del cmc
cmc = ContextManager()
user_lists.clear()
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("命令执行结果:\nℹ️ INFO 清除完成")))
case cmd if re.match(r"^set_group_ban.*", cmd):
start_index = order_lower.find("set_group_ban")
if start_index != -1:
result = order[start_index + len("set_group_ban"):].strip()
user_and_duration = re.findall(r'\d+', result)
if len(user_and_duration) == 2:
print("At in loading...")
user_id = user_and_duration[0]
ban_duration = user_and_duration[1]
await actions.set_group_ban(group_id=event.group_id, user_id=user_id, duration=ban_duration)
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\nℹ️ INFO 将{user_id}在{event.group_id}中禁言{ban_duration}秒\nℹ️ INFO None.")))
case cmd if re.match(r"^set_group_kick.*", cmd):
start_index = order.find("set_group_kick")
if start_index != -1:
result = order[start_index + len("set_group_kick"):].strip()
user_id = re.search(r'\d+', result).group()
await actions.set_group_kick(group_id=event.group_id, user_id=user_id)
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\nℹ️ INFO 将{user_id}从{event.group_id}中踢出\nℹ️ INFO None.")))
case cmd if re.match(r"^scheduled_sends_black add.*", cmd):
black_add_target = order[order.find("scheduled_sends_black add ") + len("scheduled_sends_black add "):].strip()
print(black_add_target)
def load_blacklist():
try:
with open(blacklist_file, "r", encoding="utf-8") as f:
return set(line.strip() for line in f)
except FileNotFoundError:
return set()
blacklist_content = load_blacklist()
if black_add_target not in blacklist_content:
blacklist_content.add(black_add_target)
try:
with open(blacklist_file, "w", encoding="utf-8") as f:
for item in blacklist_content:
f.write(item + "\n")
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\nℹ️ INFO 黑名單添加成功, 現列表:{', '.join(blacklist_content)}")))
except Exception as e:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\n❌ ERROR 黑名單添加失败, 原因:{e}")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\n❌ ERROR 黑名單添加失败, 原因:群{black_add_target}已在群发黑名單!")))
case cmd if re.match(r"^scheduled_sends_black del.*", cmd):
black_del_target = order[order.find("scheduled_sends_black del ") + len("scheduled_sends_black del "):].strip()
blacklist_content = load_blacklist()
if black_del_target in blacklist_content:
blacklist_content.remove(black_del_target)
try:
with open(blacklist_file, "w", encoding="utf-8") as f:
for item in blacklist_content:
f.write(item + "\n")
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\nℹ️ INFO 黑名單删除成功, 現列表:{', '.join(blacklist_content)}")))
except Exception as e:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\n❌ ERROR 黑名單删除失败, 原因:{e}")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\n❌ ERROR 黑名單删除失败, 原因:群{black_del_target}不在群发黑名單!")))
case cmd if re.match(r"^scheduled_sends_black list.*", cmd):
blacklist_content = load_blacklist()
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"黑名单列表加载完成: {', '.join(blacklist_content)}")))
case _:
# 执行用户的命令
command_result = execute_command(order)
if command_result["returncode"] == 0:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\nℹ️ INFO 执行成功\nℹ️ INFO {command_result['stdout']}.")))
if command_result["stderr"]:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\n❌ ERROR 执行失败, 代码命令可能有误\nℹ️ INFO {command_result['stderr']}.")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"命令执行结果:\n❌ ERROR 执行失败, 代码命令可能有误\nℹ️ INFO {command_result['stderr']}.\n❌ ERROR 返回码:{command_result['returncode']}.")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱")))
elif "默认4" in order:
EnableNetwork = "Net"
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("嗯……我好像升级了!o((>ω< ))o")))
elif "默认3.5" in order:
EnableNetwork = "Normal"
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("切换到大模型中运行ο(=•ω<=)ρ⌒☆")))
elif "读图" in order:
EnableNetwork = "Pixmap"
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"{bot_name}打开了新视界!o(*≧▽≦)ツ")))
elif "列出黑名单" in order:
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User or str(event.user_id) in Manage_User:
try:
with open("blacklist.sr", "r", encoding="utf-8") as f:
blacklist1 = set(line.strip() for line in f)
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"黑名单列表加载完成: {blacklist1}")))
except FileNotFoundError:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("黑名单列表加载失败,原因:没有文件")))
except UnicodeDecodeError:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("黑名单列表加载失败,原因:解码失败")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱")))
elif "添加黑名单 " in order:
blacklist_file = "blacklist.sr"
def load_blacklist():
try:
with open(blacklist_file, "r", encoding="utf-8") as f:
blacklist115 = set(line.strip() for line in f) # 使用集合方便快速查找,不然容易溶血
return blacklist115
except FileNotFoundError:
return set()
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User or str(event.user_id) in Manage_User:
Toset2 = order[order.find("添加黑名单 ") + len("添加黑名单 "):].strip()
blacklist114 = load_blacklist() # 加载现有的黑名单,防止已修改沒更新
if Toset2 not in blacklist114:
blacklist114.add(Toset2)
try:
with open(blacklist_file, "w", encoding="utf-8") as f:
for item in blacklist114:
f.write(item + "\n") # 防止之前的丟失555,并添加换行符
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"黑名單添加成功,現列表:{blacklist114}")))
except Exception as e:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"黑名單添加失败,原因:{e}")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"黑名單添加失败,原因:群{Toset2}已在黑名單!")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱")))
elif "删除黑名单 " in order:
blacklist_file = "blacklist.sr"
def load_blacklist():
try:
with open(blacklist_file, "r", encoding="utf-8") as f:
blacklist116 = set(line.strip() for line in f) # 使用集合方便快速查找,不然容易溶血
return blacklist116
except FileNotFoundError:
return set()
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User or str(event.user_id) in Manage_User:
Toset1 = order[order.find("删除黑名单 ") + len("删除黑名单 "):].strip()
blacklist117 = load_blacklist() # 加载现有的黑名单,防止已修改沒更新
if Toset1 in blacklist117:
blacklist117.remove(Toset1)
try:
with open(blacklist_file, "w", encoding="utf-8") as f:
for item in blacklist117:
f.write(item + "\n") # 防止之前的丟失555,并添加换行符
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"黑名單刪除成功,現列表:{blacklist117}")))
except Exception as e:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"黑名單刪除失败,原因:{e}")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"黑名單刪除失败,原因:群{Toset1}不在黑名單!")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱")))
elif "删除管理 " in order:
r = ""
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User:
Toset = order[order.find("删除管理 ") + len("删除管理 "):].strip()
s = Super_User
m = Manage_User
if Toset in ROOT_User:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: The specified user is a ROOT_User and group ROOT_User is read only.'''
else:
if Toset in s:
s.remove(Toset)
if Toset in m:
m.remove(Toset)
if Write_Settings(s, m):
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Succeeded: @{Toset} is a Common User now.
Now use {reminder}帮助 to know what permissions you have now.'''
else:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: Settings files are not writeable.'''
else:
r = f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱"
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(r)))
elif "管理 " in order:
r = ""
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User:
if "管理 M " in order:
Toset = order[order.find("管理 M ") + len("管理 M "):].strip()
print(f"try to get_user {Toset}")
nikename = (await actions.get_stranger_info(Toset, no_cache=True)).data.raw
print(str(nikename))
if len(nikename) == 0:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: {Toset} is not a valid user.'''
else:
nikename = nikename['nickname']
m = Manage_User
s = Super_User
if Toset in Manage_User:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Succeeded: {nikename}(@{Toset}) has become a Manage_User.'''
elif Toset in Super_User:
s.remove(Toset)
m.append(Toset)
if Write_Settings(s, m):
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Succeeded: {nikename}(@{Toset}) has become a Manage_User.
Now use {reminder}帮助 to know what permissions you have now.'''
else:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: Settings files are not writeable.'''
elif Toset in ROOT_User:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: The specified user is a ROOT_User and group ROOT_User is read only.'''
else:
m.append(Toset)
if Write_Settings(s, m):
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Succeeded: {nikename}(@{Toset}) has become a Manage_User.
Now use {reminder}帮助 to know what permissions you have now.'''
else:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: Settings files are not writeable.'''
elif "管理 S " in order:
Toset = order[order.find("管理 S ") + len("管理 S "):].strip()
print(f"try to get_user {Toset}")
nikename = (await actions.get_stranger_info(Toset, no_cache=True)).data.raw
print(str(nikename))
if len(nikename) == 0:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: {Toset} is not a valid user.'''
else:
nikename = nikename['nickname']
m = Manage_User
s = Super_User
if Toset in Manage_User:
m.remove(Toset)
s.append(Toset)
if Write_Settings(s, m):
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Succeeded: {nikename}(@{Toset}) has become a Super_User.
Now use {reminder}帮助 to know what permissions you have now.'''
else:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: Settings files are not writeable.'''
elif Toset in Super_User:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Succeeded: {nikename}(@{Toset}) has become a Super_User.'''
elif Toset in ROOT_User:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: The specified user is a ROOT_User and group ROOT_User is read only.'''
else:
s.append(Toset)
if Write_Settings(s, m):
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Succeeded: {nikename}(@{Toset}) has become a Super_User.
Now use {reminder}帮助 to know what permissions you have now.'''
else:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: Settings files are not writeable.'''
else:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: Only Manage_User or Super_User could be set.'''
else:
r = f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱"
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(r)))
elif "让我访问" in order:
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User or str(event.user_id) in Manage_User:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
sisters: {sisters}
————————————————————
Manage_User: {Manage_User}
Super_User: {Super_User}
ROOT_User: {ROOT_User}
If you are a Super_User or ROOT_User, you can manage these users. Use {reminder}帮助 to know more.'''
else:
r = f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱"
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(r)))
elif "帮助" in order:
if str(event.user_id) in ROOT_User or str(event.user_id) in Super_User:
content = f'''管理我们的{bot_name}
————————————————————
你拥有管理{bot_name}的权限。若要查看普通帮助,请@{bot_name}
1. {reminder}让我访问 —> 检索用有权限的用户
2. {reminder}管理 M (QQ号,必填) —> 为用户添加 Manage_User 权限
3. {reminder}管理 S (QQ号,必填) —> 为用户添加 Super_User 权限
4. {reminder}删除管理 (QQ号,必填) —> 删除这个用户的全部权限
5. {reminder}禁言 (@QQ+空格+时间(以秒为单位),必填) —> 禁言用户一段时间
6. {reminder}解禁 (@QQ,必填) —> 解除该用户禁言
7. {reminder}踢出 (@QQ,必填) —> 将该用户踢出聊群
8. 撤回 (引用一条消息) —> 撤回该消息
9. {reminder}注销 —> 删除所有用户的上下文
10. {reminder}修改 (hh:mm) (内容,必填) —> 改变定时消息时间与内容
11. {reminder}感知 —> 查看运行状态
12. {reminder}核验 (QQ号,必填) —> 检索QQ账号信息
13. {reminder}重启 —> 关闭所有线程和进程,关闭{bot_name}。然后重新启动{bot_name}。
14. {reminder}添加黑名单 +空格 + 群号 —> 将该群加入群发黑名单
15. {reminder}删除黑名单 +空格 + 群号 —> 将该群移除群发黑名单
16. {reminder}列出黑名单 —> 列出黑名单中的所有群
你的每一步操作,与用户息息相关。'''
elif str(event.user_id) in Manage_User:
content = f'''管理我们的{bot_name}
————————————————————
你拥有管理{bot_name}的权限。若要查看普通帮助,请@{bot_name}
1. {reminder}让我访问 —> 检索用有权限的用户
2. {reminder}注销 —> 删除所有用户的上下文
3. {reminder}修改 (hh:mm) (内容,必填) —> 改变定时消息时间与内容
4. {reminder}感知 —> 查看运行状态
5. {reminder}核验 (QQ号,必填) —> 检索QQ账号信息
6. {reminder}重启 —> 关闭所有线程和进程,关闭{bot_name}。然后重新启动{bot_name}
7. {reminder}禁言 (@QQ+空格+时间(以秒为单位),必填) —> 禁言用户一段时间
8. {reminder}解禁 (@QQ,必填) —> 解除该用户禁言
9. {reminder}踢出 (@QQ,必填) —> 将该用户踢出聊群
10. 撤回 (引用一条消息) —> 撤回该消息
11. {reminder}添加黑名单 +空格 + 群号 —> 将该群加入群发黑名单
12. {reminder}删除黑名单 +空格 + 群号 —> 将该群移除群发黑名单
13. {reminder}列出黑名单 —> 列出黑名单中的所有群
你的每一步操作,与用户息息相关。'''
else:
p = " "
n = " "
r = " "
match EnableNetwork:
case "Pixmap":
p = "(当前)"
case "Normal":
r = "(当前)"
case "Net":
n = "(当前)"
content = f'''如何与{bot_name}交流( •̀ ω •́ )✧
注:对话前必须加上 {reminder} 噢!~
1. {reminder}(任意问题,必填) —> {bot_name}回复
2. {reminder}名言【引用一条消息】 —> {bot_name}将消息载入史册
3. {reminder}读图{p}—> {bot_name}可以查看您发送的图片
4. {reminder}默认4{n}—> {bot_name}的快速回复通道✧
5. {reminder}默认3.5{r}—> {bot_name}的快速回复通道✧
6. {reminder}大头照 【@一个用户】—> {bot_name}给他拍张大头照
7. {reminder}生图 Pixiv (标签,必填,用&分割) —> {bot_name}浏览P站
8. {reminder}生图 ACG (任意类型,必填) —> {bot_name}制作精美二次元壁纸
9. {reminder}做我姐姐吧 / {reminder}当我女朋友(默认)/ {reminder}做我mm吧 —> {bot_name}切换不同的角色互动噢!~
快来聊天吧(*≧︶≦)'''
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(content)))
elif (isinstance(event.message[0], Segments.At) and int(event.message[0].qq) == event.self_id):
p = " "
n = " "
r = " "
match EnableNetwork:
case "Pixmap":
p = "(当前)"
case "Normal":
r = "(当前)"
case "Net":
n = "(当前)"
content = f'''如何与{bot_name}交流( •̀ ω •́ )✧
注:对话前必须加上 {reminder} 噢!~
1. {reminder}(任意问题,必填) —> {bot_name}回复
2. {reminder}名言【引用一条消息】 —> {bot_name}将消息载入史册
3. {reminder}读图{p}—> {bot_name}可以查看您发送的图片
4. {reminder}默认4{n}—> {bot_name}的快速回复通道✧
5. {reminder}默认3.5{r}—> {bot_name}的快速回复通道✧
6. {reminder}大头照 【@一个用户】—> {bot_name}给他拍张大头照
7. {reminder}生图 Pixiv (标签,必填,用&分割) —> {bot_name}浏览P站
8. {reminder}生图 ACG (任意类型,必填) —> {bot_name}制作精美二次元壁纸
9. {reminder}做我姐姐吧 / {reminder}当我女朋友(默认)/ {reminder}做我mm吧 —> {bot_name}切换不同的角色互动噢!~
快来聊天吧(*≧︶≦)'''
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(content)))
elif "关于" in order:
global version_name
about = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Build Information
Version:{version_name}
Powered by NapCat.OneBot
Rebuilt from HypeR
————————————————————
Third-party API
1. Mirokoi API
2. Lolicon API
2. LoliAPI API
4. ChatGPT 3.5-turbo-16k
5. ChatGPT 4o-mini
6. Google gemini-2.0-flash-thinking-exp-01-21
————————————————————
Copyright
Made by SR Studio
2019~2025 All rights reserved'''
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(about)))
elif "当我女朋友" in order:
st = sisters
if str(event.user_id) in st:
st.remove(str(event.user_id))
st = [item for item in st if item]
sts = ""
for item in range(len(st)):
sts += st[item]
if item != len(st) - 1:
sts += "\n"
jh = jhq
if str(event.user_id) in jh:
jh.remove(str(event.user_id))
jh = [item for item in jh if item]
jhs = ""
for item in range(len(jh)):
jhs += jh[item]
if item != len(jh) - 1:
jhs += "\n"
try:
with open("sisters.ini", "w") as f:
f.write(sts)
f.close()
sisters = st
with open("jhq.ini", "w") as f:
f.write(jhs)
f.close()
jhq = jh
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("老公~你回来啦~(*≧︶≦)")))
except Exception as e:
print(traceback.format_exc)
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"可是{bot_name}还想继续做你的姐姐,这样我就可以保护你了!(๑•̀ㅂ•́)و✧")))
elif "做我姐姐吧" in order:
st = sisters
if str(event.user_id) not in st:
st.append(str(event.user_id))
st = [item for item in st if item]
sts = ""
for item in range(len(st)):
sts += st[item]
if item != len(st) - 1:
sts += "\n"
jh = jhq
if str(event.user_id) in jh:
jh.remove(str(event.user_id))
jh = [item for item in jh if item]
jhs = ""
for item in range(len(jh)):
jhs += jh[item]
if item != len(jh) - 1:
jhs += "\n"
try:
with open("sisters.ini", "w") as f:
f.write(sts)
f.close()
sisters = st
with open("jhq.ini", "w") as f:
f.write(jhs)
f.close()
jhq = jh
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("你好呀!妹妹!~o(*≧▽≦)ツ")))
except Exception as e:
print(traceback.format_exc)
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"呜呜呜……{bot_name}还想继续做你的女朋友,依赖你 (*/ω\*)")))
elif "做我mm吧" in order:
st = sisters
if str(event.user_id) in st:
st.remove(str(event.user_id))
st = [item for item in st if item]
sts = ""
for item in range(len(st)):
sts += st[item]
if item != len(st) - 1:
sts += "\n"
jh = jhq
if str(event.user_id) not in jh:
jh.append(str(event.user_id))
jh = [item for item in jh if item]
jhs = ""
for item in range(len(jh)):
jhs += jh[item]
if item != len(jh) - 1:
jh += "\n"
try:
with open("sisters.ini", "w") as f:
f.write(sts)
f.close()
sisters = st
with open("jhq.ini", "w") as f:
f.write(jhs)
f.close()
jhq = jh
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text("你好呀!血小板!~o(*≧▽≦)ツ")))
except Exception as e:
print(traceback.format_exc)
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"呜呜呜……{bot_name}还想继续做你的女朋友,依赖你 (*/ω\*)")))
elif "核验 " in order:
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User or str(event.user_id) in Manage_User:
uid = order[order.find("核验 ") + len("核验 "):].strip()
print(f"try to get_user {uid}")
nikename = (await actions.get_stranger_info(uid)).data.raw
print(f"get {nikename} successfully")
if len(nikename) == 0:
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
Failed: {uid} is not a valid user.'''
else:
items = [f"{key}: {value}" for key, value in nikename.items()]
result = "\n".join(items)
r = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
{result}'''
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(r)))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱")))
elif f"{reminder}感知" in str(event.message):
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User or str(event.user_id) in Manage_User:
system_info = get_system_info()
feel = f'''{bot_name} {bot_name_en} - 简单 可爱 个性 全知
————————————————————
System Now
Running {seconds_to_hms(round(time.time() - second_start, 2))}
Syetem Version:{system_info["version_info"]}
Architecture:{system_info["architecture"]}
CPU Usage:{str(system_info["cpu_usage"]) + "%"}
Memory Usage:{str(system_info["memory_usage_percentage"]) + "%"}'''
for i, usage in enumerate(system_info["gpu_usage"]):
feel = feel + f"\nGPU {i} Usage:{usage * 100:.2f}%"
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(feel)))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱")))
elif f"{reminder}注销" in str(event.message):
if str(event.user_id) in Super_User or str(event.user_id) in ROOT_User or str(event.user_id) in Manage_User:
# global cmc
del cmc
cmc = ContextManager()
user_lists.clear()
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"卸下包袱,{bot_name}更轻松了~ (/≧▽≦)/")))
else:
await actions.send(group_id=event.group_id, message=Manager.Message(Segments.Text(f"不能这么做!那是一块丞待开发的禁地,可能很危险,{bot_name}很胆小……꒰>﹏< ꒱")))
elif f"{reminder}名言" in str(event.message):
print("获取名言")
imageurl = None
if isinstance(event.message[0], Segments.Reply):
print("有消息反馈")
msg_id = event.message[0].id
content = await actions.get_msg(msg_id)
message = content.data["message"]
message = gen_message({"message": message})