-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrouters.py
More file actions
120 lines (102 loc) · 5.26 KB
/
routers.py
File metadata and controls
120 lines (102 loc) · 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import asyncio
from aiogram import F, Router,types
from aiogram.filters import CommandStart,Command
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.context import FSMContext
from aiogram.types import Message,FSInputFile,ReplyKeyboardRemove,ReplyKeyboardMarkup,KeyboardButton
import barcods as br
import database as db
import keyboard as kb
import logging
log=logging.getLogger()
router=Router()
class Broadcast(StatesGroup):
message_to_broadcast=State()
from_where_chat_id=State()
from_where_id_message=State()
is_to_admin=State()
class ChoosenUser(StatesGroup):
username_to_search=State()
is_make_admin=State()
@router.message(CommandStart())
async def start(message:Message)->None:
await message.answer("Hi")
is_there=await db.add_user(message.from_user.id,message.chat.id,message.from_user.username)
if is_there==True:
await message.answer("You are connected to bot!",reply_markup=kb.main_panel)
elif is_there==False:
if await db.get_user(message.from_user.id,False,True) is not None:
await message.answer("You are already in bot!",reply_markup=kb.main_panel_admin)
else:
await message.answer("You are already in bot!",reply_markup=kb.main_panel)
@router.message(F.text == 'Print barcode ⌨')
async def print_barcode(message:Message)->None:
barcode=await br.barcode(message.from_user.id)
barcode=FSInputFile(f"{barcode}.png")
await message.answer_photo(barcode,caption="It's your barcode! On buy something give it to seller for recieve a discount!")
@router.message(F.text == 'Make admin⏫')
async def make_admin(message:Message,state:FSMContext)->None:
if await db.get_user(id=message.from_user.id,get_all=False,get_admin=True) is not None:
keyboards=await db.get_usernames_list(get_admins=False)
if not keyboards:
await message.answer("Error! Have not users")
log.error("Error! Have not users")
return
await message.answer("Choose people to make admin:",reply_markup=ReplyKeyboardMarkup(keyboard=await kb.generate_list_keyboard(keyboards),resize_keyboard=True))
await state.set_state(ChoosenUser.username_to_search)
await state.update_data(is_make_admin=True)
else:
await message.answer("You are haven't permissions to do this command")
@router.message(F.text == 'Unmake admin⏬')
async def unmake_admin(message:Message,state:FSMContext)->None:
if await db.get_user(id=message.from_user.id,get_all=False,get_admin=True) is not None:
keyboards=await db.get_usernames_list(get_admins=True)
if not keyboards:
await message.answer("Error! Have not admins")
log.error("Error! Have not admins")
return
await message.answer("Choose people to unmake admin:",reply_markup=ReplyKeyboardMarkup(keyboard=await kb.generate_list_keyboard(keyboards),resize_keyboard=True))
await state.set_state(ChoosenUser.username_to_search)
await state.update_data(is_make_admin=False)
else:
await message.answer("You are haven't permissions to do this command")
@router.message(ChoosenUser.username_to_search)
async def process_user(message:Message,state:FSMContext):
await state.update_data(username_to_search=message.text)
data=await state.get_data()
await state.clear()
username=data["username_to_search"]
user=await db.get_user_by_username(username)
await db.change_admin_status(user,data["is_make_admin"])
if await db.get_user(id=message.from_user.id,get_all=False,get_admin=True) is not None:
keyboard=kb.main_panel_admin
else:
keyboard=kb.main_panel
await message.answer(f"Changed admin status to user:{user}",reply_markup=keyboard)
log.warning(f"Changed admin status to user:{user}")
@router.message(F.text == 'Make broadcast📢')
async def broadcast(message:Message,state:FSMContext)->None:
await message.reply("Okay, send message to broadcast")
await state.set_state(Broadcast.message_to_broadcast)
@router.message(Broadcast.message_to_broadcast)
async def process_message(message:Message,state:FSMContext)->None:
await state.update_data(from_where_id_message=message.message_id)
await state.update_data(from_where_chat_id=message.chat.id)
await message.answer("Is will this message sended only to admins?",reply_markup=kb.is_for_admin,)
await state.set_state(Broadcast.is_to_admin)
@router.message(Broadcast.is_to_admin)
async def process_is_to_admin(message:Message,state:FSMContext)->None:
if message.text=="Yes":
await state.update_data(is_to_admin=True)
elif message.text=="No":
await state.update_data(is_to_admin=False)
await message.answer("Ok! Got it! Trying to send", reply_markup=ReplyKeyboardRemove())
data=await state.get_data()
await state.clear()
users=await db.get_users_is_admin(data["is_to_admin"])
for id,chat_id in users.items():
await message.bot.forward_message(chat_id=chat_id,from_chat_id=data["from_where_chat_id"],message_id=data["from_where_id_message"],)
if await db.get_user(message.from_user.id,False,True) is not None:
await message.answer("Сhoose command to use!",reply_markup=kb.main_panel_admin)
else:
await message.answer("Сhoose command to use!",reply_markup=kb.main_panel)