generated from xalq-mazza-qilsin/Aiogram3-Template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
96 lines (68 loc) · 3.03 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import asyncio
from aiogram import Bot, Dispatcher
from aiogram.client.session.middlewares.request_logging import logger
from loader import db
from utils.extra_datas import write_order_to_sheets
from apscheduler.schedulers.asyncio import AsyncIOScheduler
def setup_handlers(dispatcher: Dispatcher) -> None:
"""HANDLERS"""
from handlers import setup_routers
dispatcher.include_router(setup_routers())
def setup_middlewares(dispatcher: Dispatcher, bot: Bot) -> None:
"""MIDDLEWARE"""
from middlewares.throttling import ThrottlingMiddleware
from middlewares.admin import AdminMiddleware
#
# # Spamdan himoya qilish uchun klassik ichki o'rta dastur. So'rovlar orasidagi asosiy vaqtlar 0,5 soniya
# dispatcher.message.middleware(ThrottlingMiddleware(slow_mode_delay=0.5))
dispatcher.message.middleware(AdminMiddleware())
def setup_filters(dispatcher: Dispatcher) -> None:
"""FILTERS"""
from filters import ChatPrivateFilter
# Chat turini aniqlash uchun klassik umumiy filtr
# Filtrni handlers/users/__init__ -dagi har bir routerga alohida o'rnatish mumkin
dispatcher.message.filter(ChatPrivateFilter(chat_type=["private"]))
async def setup_aiogram(dispatcher: Dispatcher, bot: Bot) -> None:
logger.info("Configuring aiogram")
setup_handlers(dispatcher=dispatcher)
setup_middlewares(dispatcher=dispatcher, bot=bot)
setup_filters(dispatcher=dispatcher)
logger.info("Configured aiogram")
async def database_connected():
await db.create()
await db.delete_groups()
async def aiogram_on_startup_polling(dispatcher: Dispatcher, bot: Bot, scheduler: AsyncIOScheduler) -> None:
from utils.set_bot_commands import set_default_commands
from utils.notify_admins import on_startup_notify
logger.info("Database connected")
await database_connected()
logger.info("Starting polling")
await bot.delete_webhook(drop_pending_updates=True)
await setup_aiogram(bot=bot, dispatcher=dispatcher)
await on_startup_notify(bot=bot)
await set_default_commands(bot=bot)
scheduler.add_job(write_order_to_sheets, 'interval', seconds=20)
scheduler.start()
async def aiogram_on_shutdown_polling(dispatcher: Dispatcher, bot: Bot, scheduler: AsyncIOScheduler):
logger.info("Stopping polling")
await bot.session.close()
await dispatcher.storage.close()
scheduler.shutdown(wait=False)
def main():
"""CONFIG"""
from data.config import BOT_TOKEN
from aiogram.enums import ParseMode
from aiogram.fsm.storage.memory import MemoryStorage
bot = Bot(token=BOT_TOKEN, parse_mode=ParseMode.HTML)
storage = MemoryStorage()
dispatcher = Dispatcher(storage=storage)
scheduler = AsyncIOScheduler()
dispatcher['scheduler'] = scheduler
dispatcher.startup.register(aiogram_on_startup_polling)
dispatcher.shutdown.register(aiogram_on_shutdown_polling)
asyncio.run(dispatcher.start_polling(bot, close_bot_session=True))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logger.info("Bot stopped!")