Files
tg-utility/main.py
Nikita Tyukalov, ASUS, Linux 40385167e3 Initial commit
2025-11-22 23:39:39 +03:00

89 lines
2.1 KiB
Python

''' Application Entry Point '''
import importlib
import asyncio
import signal
import os
import utils
import robot
import hubot
import config
# get list of mods
_mods = {}
# import all mods
for m in utils.get_all_mods():
_mods[m] = importlib.import_module(m)
# application config
_config = None
# task of termination routine
_termination_task = None
# termination event
_termination_event = None
async def set_termination_event() -> None:
''' Just set the termination event '''
_termination_event.set()
def signal_handler() -> None:
''' Start termination task '''
global _termination_task
print()
# already terminating - suicide
if _termination_task is not None:
print('[!] Suiciding!')
os.kill(os.getpid(), signal.SIGKILL)
# not suicide yet
print('[I] Trying to terminate gracefully...')
_termination_task = asyncio.ensure_future(set_termination_event())
async def main() -> None:
global _config, _termination_event
print('[I] tg-utility')
# create termination event
_termination_event = asyncio.Event()
# setup the signal handlers
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, signal_handler)
loop.add_signal_handler(signal.SIGTERM, signal_handler)
# load the config
_config = config.read_config('config.json')
if type(_config) is not dict:
print('[!] Invalid config, can\'t continue')
return
# start the bot
await robot.start(_config)
# start all hubots
for name in _config['accounts']:
await hubot.add_session(name, _config)
# initialize all mods
for mod in _mods:
await _mods[mod].mod_init(_config)
# set available mods in hubot
hubot.set_mods(_mods)
# wait for termination event to happen
await _termination_event.wait()
# unset available mods in hubot
hubot.unset_mods()
# deinitialize all mods
for mod in _mods:
await _mods[mod].mod_deinit()
# stop all sessions
await hubot.stop_all()
# stop the bot
await robot.stop()
if __name__ == '__main__':
asyncio.run(main())