''' Management bot on Telegram ''' import html from telegram import Bot, Update from telegram.ext import ApplicationBuilder, Application from telegram.ext import MessageHandler, CommandHandler import telegram.ext.filters as filters import hubot # application config _config : dict = None # application _app : Application = None # bot username _username : str = None async def cmd_auth_handler(update, context) -> None: ''' Callback for /auth ''' # objs msg = update.message uid = msg.chat.id # not admin if uid != _config['tg_admin_uid']: await msg.reply_text('Permission denied') return # get args args = context.args if len(args) != 2: await msg.reply_text('Must have two args: ACCOUNT_NAME and AUTH_DATA') return # account and data account_name = args[0] data = args[1] # res = await hubot.provide_auth_data(account_name, data) if res: await msg.reply_text('Authentication data is provided') else: await msg.reply_text('Failed to provide authentication data') async def cmd_accs_handler(update, context) -> None: ''' Callback for /accs ''' # objs msg = update.message uid = msg.chat.id # not admin if uid != _config['tg_admin_uid']: await msg.reply_text('Permission denied') return # prepare accs = _config['accounts'] text = 'Accounts:' # empty if not accs: text += '\n\nNo accounts are added' await msg.reply_text(text, parse_mode='HTML') return # serialize for n in accs: a = accs[n] text += '\n\nAccount \'%s\'' % html.escape(n) text += '\nLogin: %s' % a['login'] text += '\nEnabled mods:' mods = [k for k in a if k.startswith('mod_') and a[k]] if not mods: text += '\n no mods are enabled' else: for m in mods: text += '\n - %s' % m # reply await msg.reply_text(text, parse_mode='HTML') async def text_handler(update, context) -> None: ''' New message callback ''' msg = update.message uid = msg.chat.id await msg.reply_text('ok text') async def start(config: dict) -> bool: ''' Start management bot ''' global _config, _app, _username # app already exists if _app is not None: return False # save the config _config = config # create the application _app = ApplicationBuilder().token(_config['management_token']).build() # add handlers _app.add_handler(CommandHandler('auth', cmd_auth_handler)) _app.add_handler(CommandHandler('accs', cmd_accs_handler)) #_app.add_handler(MessageHandler(filters.TEXT, text_handler), 100) # initialize it try: await _app.initialize() except: _app = None print('[I] Can\'t initialize robot Application') return False # start polling try: await _app.updater.start_polling() except: await _app.shutdown() _app = None print('[!] Can\'t start robot polling') return False # start app try: await _app.start() except: await _app.updater.stop() await _app.shutdown() _app = None print('[!] Can\'t start robot Application') return False # get our username try: me = await _app.bot.get_me() _username = f'@{me.username}' except: _username = '@???' # log print('[I] Robot (%s) is started' % _username) return True async def stop() -> bool: ''' Stop management bot ''' global _app # not started if _app is None: return True # stop try: await _app.updater.stop() await _app.stop() await _app.shutdown() except: pass # log print('[I] Robot is stopped') return True async def send_to_admin(text: str, parse_mode: str = None) -> bool: ''' Send text message to admin ''' return await send_to(_config['tg_admin_uid'], text, parse_mode) async def send_to(uid: int, text : str, parse_mode: str = None) -> bool: ''' Send text message to chat ''' # no bot if _app is None: return False # send the message try: await _app.bot.send_message( uid, text, parse_mode=parse_mode ) return True except: return False