-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
69 lines (56 loc) · 1.95 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
entry point of the application where
FastAPI app is created and routes are defined.
"""
import sys
import logging
import fastapi
from app.bot.presentation import router
from app.bot.settings.internal.conf import port
from app.bot.services.external.aiogram import bot
from app.bot.services.external.alchemy import models
from app.bot.services.external.alchemy import engine
from app.bot.services.external.aiogram import dispatcher as dp
from app.bot.settings.internal.conf.bot import NOTIFICATION_ID
from app.bot.settings.internal.conf.bot import IS_WEBHOOK_ENABLED
app = fastapi.FastAPI()
@app.on_event("startup")
async def startup_event():
"""
startup events.
"""
# run migrations
models.Base.metadata.create_all(bind=engine)
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
if IS_WEBHOOK_ENABLED:
current_webhook = await bot.get_webhook_info()
service_webhook = f"{port.SERVICE_ADDRESS}/{port.SERVICE_NAME}/v1/bot/webhook/" # noqa
if current_webhook.url != service_webhook:
await bot.delete_webhook()
await bot.set_webhook(
url=service_webhook,
drop_pending_updates=True
)
await bot.send_message(
chat_id=NOTIFICATION_ID,
text=f"✅ Webhook URL has been reset to - {service_webhook}"
)
await bot.send_message(
chat_id=NOTIFICATION_ID,
text="✅ Bot has been started with webhook mode"
)
return
if IS_WEBHOOK_ENABLED is False:
await bot.delete_webhook(drop_pending_updates=True)
await bot.send_message(
chat_id=NOTIFICATION_ID,
text="✅ Bot has been started with polling mode"
)
await dp.start_polling(bot)
@app.on_event("shutdown")
async def on_shutdown():
"""
shutdown events.
"""
await bot.session.close()
app.add_api_route(**router.updates_route)