''' Management bot on Telegram '''
import html
from telegram import Bot, Update
from telegram.ext import ApplicationBuilder, Application
from telegram.ext import MessageHandler, CommandHandler
import telegram.ext.filters as filters
import hubot
# application config
_config : dict = None
# application
_app : Application = None
# bot username
_username : str = None
async def cmd_auth_handler(update, context) -> None:
''' Callback for /auth '''
# objs
msg = update.message
uid = msg.chat.id
# not admin
if uid != _config['tg_admin_uid']:
await msg.reply_text('Permission denied')
return
# get args
args = context.args
if len(args) != 2:
await msg.reply_text('Must have two args: ACCOUNT_NAME and AUTH_DATA')
return
# account and data
account_name = args[0]
data = args[1]
#
res = await hubot.provide_auth_data(account_name, data)
if res:
await msg.reply_text('Authentication data is provided')
else:
await msg.reply_text('Failed to provide authentication data')
async def cmd_accs_handler(update, context) -> None:
''' Callback for /accs '''
# objs
msg = update.message
uid = msg.chat.id
# not admin
if uid != _config['tg_admin_uid']:
await msg.reply_text('Permission denied')
return
# prepare
accs = _config['accounts']
text = 'Accounts:'
# empty
if not accs:
text += '\n\nNo accounts are added'
await msg.reply_text(text, parse_mode='HTML')
return
# serialize
for n in accs:
a = accs[n]
text += '\n\nAccount \'%s\'' % html.escape(n)
text += '\nLogin: %s' % a['login']
text += '\nEnabled mods:'
mods = [k for k in a if k.startswith('mod_') and a[k]]
if not mods:
text += '\n no mods are enabled'
else:
for m in mods:
text += '\n - %s' % m
# reply
await msg.reply_text(text, parse_mode='HTML')
async def text_handler(update, context) -> None:
''' New message callback '''
msg = update.message
uid = msg.chat.id
await msg.reply_text('ok text')
async def start(config: dict) -> bool:
''' Start management bot '''
global _config, _app, _username
# app already exists
if _app is not None:
return False
# save the config
_config = config
# create the application
_app = ApplicationBuilder().token(_config['management_token']).build()
# add handlers
_app.add_handler(CommandHandler('auth', cmd_auth_handler))
_app.add_handler(CommandHandler('accs', cmd_accs_handler))
#_app.add_handler(MessageHandler(filters.TEXT, text_handler), 100)
# initialize it
try:
await _app.initialize()
except:
_app = None
print('[I] Can\'t initialize robot Application')
return False
# start polling
try:
await _app.updater.start_polling()
except:
await _app.shutdown()
_app = None
print('[!] Can\'t start robot polling')
return False
# start app
try:
await _app.start()
except:
await _app.updater.stop()
await _app.shutdown()
_app = None
print('[!] Can\'t start robot Application')
return False
# get our username
try:
me = await _app.bot.get_me()
_username = f'@{me.username}'
except:
_username = '@???'
# log
print('[I] Robot (%s) is started' % _username)
return True
async def stop() -> bool:
''' Stop management bot '''
global _app
# not started
if _app is None:
return True
# stop
try:
await _app.updater.stop()
await _app.stop()
await _app.shutdown()
except:
pass
# log
print('[I] Robot is stopped')
return True
async def send_to_admin(text: str, parse_mode: str = None) -> bool:
''' Send text message to admin '''
return await send_to(_config['tg_admin_uid'], text, parse_mode)
async def send_to(uid: int, text : str, parse_mode: str = None) -> bool:
''' Send text message to chat '''
# no bot
if _app is None:
return False
# send the message
try:
await _app.bot.send_message(
uid,
text,
parse_mode=parse_mode
)
return True
except:
return False