173 lines
4.4 KiB
Python
173 lines
4.4 KiB
Python
''' Management bot on Telegram '''
|
|
|
|
import html
|
|
|
|
from telegram import Bot, Update
|
|
from telegram.ext import ApplicationBuilder, Application
|
|
from telegram.ext import MessageHandler, CommandHandler
|
|
import telegram.ext.filters as filters
|
|
|
|
import hubot
|
|
|
|
|
|
# application config
|
|
_config : dict = None
|
|
# application
|
|
_app : Application = None
|
|
# bot username
|
|
_username : str = None
|
|
|
|
async def cmd_auth_handler(update, context) -> None:
|
|
''' Callback for /auth '''
|
|
# objs
|
|
msg = update.message
|
|
uid = msg.chat.id
|
|
# not admin
|
|
if uid != _config['tg_admin_uid']:
|
|
await msg.reply_text('Permission denied')
|
|
return
|
|
# get args
|
|
args = context.args
|
|
if len(args) != 2:
|
|
await msg.reply_text('Must have two args: ACCOUNT_NAME and AUTH_DATA')
|
|
return
|
|
# account and data
|
|
account_name = args[0]
|
|
data = args[1]
|
|
#
|
|
res = await hubot.provide_auth_data(account_name, data)
|
|
if res:
|
|
await msg.reply_text('Authentication data is provided')
|
|
else:
|
|
await msg.reply_text('Failed to provide authentication data')
|
|
|
|
async def cmd_accs_handler(update, context) -> None:
|
|
''' Callback for /accs '''
|
|
# objs
|
|
msg = update.message
|
|
uid = msg.chat.id
|
|
# not admin
|
|
if uid != _config['tg_admin_uid']:
|
|
await msg.reply_text('Permission denied')
|
|
return
|
|
# prepare
|
|
accs = _config['accounts']
|
|
text = '<b>Accounts:</b>'
|
|
# empty
|
|
if not accs:
|
|
text += '\n\n<i>No accounts are added</i>'
|
|
await msg.reply_text(text, parse_mode='HTML')
|
|
return
|
|
# serialize
|
|
for n in accs:
|
|
a = accs[n]
|
|
text += '\n\n<b>Account \'<i>%s</i>\'</b>' % html.escape(n)
|
|
text += '\n<b>Login:</b> %s' % a['login']
|
|
text += '\n<b>Enabled mods:</b>'
|
|
mods = [k for k in a if k.startswith('mod_') and a[k]]
|
|
if not mods:
|
|
text += '\n <i>no mods are enabled</i>'
|
|
else:
|
|
for m in mods:
|
|
text += '\n - <i>%s</i>' % m
|
|
# reply
|
|
await msg.reply_text(text, parse_mode='HTML')
|
|
|
|
|
|
async def text_handler(update, context) -> None:
|
|
''' New message callback '''
|
|
msg = update.message
|
|
uid = msg.chat.id
|
|
await msg.reply_text('ok text')
|
|
|
|
async def start(config: dict) -> bool:
|
|
''' Start management bot '''
|
|
global _config, _app, _username
|
|
# app already exists
|
|
if _app is not None:
|
|
return False
|
|
# save the config
|
|
_config = config
|
|
# create the application
|
|
_app = ApplicationBuilder().token(_config['management_token']).build()
|
|
|
|
# add handlers
|
|
_app.add_handler(CommandHandler('auth', cmd_auth_handler))
|
|
_app.add_handler(CommandHandler('accs', cmd_accs_handler))
|
|
#_app.add_handler(MessageHandler(filters.TEXT, text_handler), 100)
|
|
|
|
# initialize it
|
|
try:
|
|
await _app.initialize()
|
|
except:
|
|
_app = None
|
|
print('[I] Can\'t initialize robot Application')
|
|
return False
|
|
|
|
# start polling
|
|
try:
|
|
await _app.updater.start_polling()
|
|
except:
|
|
await _app.shutdown()
|
|
_app = None
|
|
print('[!] Can\'t start robot polling')
|
|
return False
|
|
|
|
# start app
|
|
try:
|
|
await _app.start()
|
|
except:
|
|
await _app.updater.stop()
|
|
await _app.shutdown()
|
|
_app = None
|
|
print('[!] Can\'t start robot Application')
|
|
return False
|
|
|
|
# get our username
|
|
try:
|
|
me = await _app.bot.get_me()
|
|
_username = f'@{me.username}'
|
|
except:
|
|
_username = '@???'
|
|
|
|
# log
|
|
print('[I] Robot (%s) is started' % _username)
|
|
return True
|
|
|
|
async def stop() -> bool:
|
|
''' Stop management bot '''
|
|
global _app
|
|
# not started
|
|
if _app is None:
|
|
return True
|
|
# stop
|
|
try:
|
|
await _app.updater.stop()
|
|
await _app.stop()
|
|
await _app.shutdown()
|
|
except:
|
|
pass
|
|
# log
|
|
print('[I] Robot is stopped')
|
|
return True
|
|
|
|
async def send_to_admin(text: str, parse_mode: str = None) -> bool:
|
|
''' Send text message to admin '''
|
|
return await send_to(_config['tg_admin_uid'], text, parse_mode)
|
|
|
|
async def send_to(uid: int, text : str, parse_mode: str = None) -> bool:
|
|
''' Send text message to chat '''
|
|
# no bot
|
|
if _app is None:
|
|
return False
|
|
# send the message
|
|
try:
|
|
await _app.bot.send_message(
|
|
uid,
|
|
text,
|
|
parse_mode=parse_mode
|
|
)
|
|
return True
|
|
except:
|
|
return False
|