commit 40385167e32653ee1a80faa3e321858504aae231 Author: Nikita Tyukalov, ASUS, Linux Date: Sat Nov 22 23:39:39 2025 +0300 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..08593ea --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__ +sessions +*_temp +.*.swp +.*.swo +config.json diff --git a/config.py b/config.py new file mode 100644 index 0000000..9639469 --- /dev/null +++ b/config.py @@ -0,0 +1,107 @@ +''' Config reader ''' + +import json + +def read_config(path: str) -> dict | None: + ''' Read config ''' + default = { + 'management_token': 'TELEGRAM_BOT_TOKEN_HERE', + 'tg_api_id': 12345, + 'tg_api_hash': '0123456789abcdef0123456789abcdef', + 'accounts': { + 'account1': { + 'login': '+71234567890', + 'mod_basic': True, + 'mod_eternal_online': False, + 'mod_video_downloader': True + } + } + } + # root fields + root_required_fields = { + 'management_token': [str], + 'tg_api_id': [int], + 'tg_api_hash': [str], + 'tg_admin_uid': [int], + 'accounts': [dict] + } + + # account fields + required_fields = { + 'login': [str] + } + optional_fields = { + 'mod_basic': ([bool], True), + 'mod_eternal_online': ([bool], False), + 'mod_video_downloader': ([bool], False) + } + try: + j = None + with open(path, 'r') as f: + j = json.loads(f.read()) + # root fields check + for key in root_required_fields: + if key not in j: + print('[!] Config misses key \'%s\'' % key) + return None + if type(j[key]) not in root_required_fields[key]: + print('[!] Config has key \'%s\' of invalid type: must be one of %s' % (key, root_required_fields[key])) + return None + # check all accounts + for account_name in dict(j['accounts']): + # to remove the account? + to_remove = False + + # save the account with short name + acc = j['accounts'][account_name] + # check if all required fields are present + for key in required_fields: + # missing field + if key not in acc: + to_remove = True + print('[!] Account \'%s\' misses required field \'%s\'' % (account_name, key)) + break + # invalid type + t = type(acc[key]) + if t not in required_fields[key]: + to_remove = True + print('[!] Account \'%s\' has required field of invalid type: \'%s\' must be one of %s' % (account_name, key, required_fields[key])) + break + # remove the account + if to_remove: + print('[!] Account \'%s\' will not be served until errors are fixed' % account_name) + del j[account_name] + continue + # check all optional fields + for key in optional_fields: + # get allowed types and default value + allowed_types, default_value = optional_fields[key] + # missing - add default value + if key not in acc: + print( + '[W] Account \'%s\' misses optional field \'%s\', using default value \'%s\'' % \ + (account_name, key, default_value) + ) + acc[key] = default_value + continue + # invalid type + t = type(acc[key]) + if t not in allowed_types: + print( + '[W] Account \'%s\' has optional field of invalid type: \'%s\' must be one of %s. Using default value %s' % \ + (account_name, key, types, default_value) + ) + acc[key] = default_value + continue + # log + print('[I] Added account \'%s\' (login \'%s\')' % (account_name, acc['login'])) + # return + return j + except: + try: + with open(path, 'w') as f: + f.write(json.dumps(default, indent=4)) + print('[!] Saved default config to %s' % path) + except: + print('[!] Failed to save default config to %s' % path) + return None diff --git a/hubot.py b/hubot.py new file mode 100644 index 0000000..04e8f83 --- /dev/null +++ b/hubot.py @@ -0,0 +1,233 @@ +''' HUman BOT ''' + +import asyncio + +from telethon import TelegramClient, events +import html + +import robot +import utils + +# active sessions +sessions = {} +# available mods +_mods = {} + +class AuthData: + def __init__(self, phone: str, account_name: str): + self.name = account_name + self.phone = phone + self.q = asyncio.Queue() + self.waiting = False + + async def get_phone(self) -> str: + ''' Workaround. + It is required, because TelegramClient will treat get_password and get_code + as coroutines only if coroutine is passed for phone number. + ''' + return self.phone + + async def __get_data(self, data_name: str) -> str | None: + ''' Get data using Telegram management bot ''' + # already waiting + if self.waiting: + await asyncio.sleep(1) + return None + # waiting now + self.waiting = True + # request + try: + # log + print('[I] Requesting %s for account \'%s\'...' % (data_name, self.name)) + # send message to admin + text = 'Session %s requires %s!' % (html.escape(self.name), data_name) + text += '\nSend it like this:' + text += '\n\n/auth %s [%s HERE]' % (html.escape(self.name), data_name.upper()) + text += '\n\nBe aware that Telegram has protection' \ + + ' which prohibits sharing your credentials via Telegram.' \ + + ' You have to put underscore as second symbol in order' \ + + ' to evade this protection. For example, 12345' \ + + ' becomes 1_2345.' + await robot.send_to_admin(text, parse_mode='HTML') + # wait for data + data = await self.q.get() + # remove second symbol + data = data.strip() + data = data[0] + data[2:] + self.waiting = False + return data + except: + self.waiting = False + return None + + async def get_password(self) -> str | None: + ''' Get password using Telegram bot ''' + return await self.__get_data('password') + + async def get_code(self): + ''' Get code using Telegram bot ''' + return await self.__get_data('OTP') + + async def provide_data(self, data: str) -> bool: + ''' Provide data from other tasks ''' + if not self.waiting: + return False + self.waiting = False + await self.q.put(data) + return True + +async def _cb_new_message(event) -> None: + ''' Handle new message ''' + # get the client + client = event.client + name = client.session_name + # get message text + msg_text = None + msg_word = None + try: + m = event.message + msg_text = m.message + msg_word = msg_text.split(' ')[0].lower() + except: + pass + # tasks + tasks = [] + # pass the event to all mods interested in it + for mod_name in _mods: + # mod + mod = _mods[mod_name] + try: + # the mod matches the message? + if mod.mod_get_mighty() or msg_word in mod.mod_get_tags(): + tasks.append( + asyncio.create_task( + mod.mod_new_message(sessions[name], event) + ) + ) + except: + utils.pex() + # wait for all tasks to complete + while tasks: + done, pending = await asyncio.wait(tasks) + tasks = pending + +async def _session_task(session: dict) -> None: + ''' Task that manages the session ''' + # log + print('[I] Started session \'%s\'' % session['name']) + # client + client = session['client'] + # tasks to monitor + to_monitor = {} + to_monitor['stop'] = asyncio.create_task(session['stop_event'].wait()) + to_monitor['bot_start'] = asyncio.create_task( + client.start( + session['auth_data'].get_phone, + password=session['auth_data'].get_password, + code_callback=session['auth_data'].get_code, + ) + ) + # whether to continue work + to_work = True + # session loop + while to_work: + # id is unknown and we are authorized + if 'bot_start' not in to_monitor and 'bot_get_me' not in to_monitor and session['uid'] is None: + to_monitor['bot_get_me'] = asyncio.create_task( + client.get_me() + ) + # what to wait for + aws = [to_monitor[k] for k in to_monitor] + # wait + done, pending = await asyncio.wait(aws, return_when=asyncio.FIRST_COMPLETED) + # stop + if to_monitor['stop'] in done: + to_work = False + # get_me + if 'bot_get_me' in to_monitor and to_monitor['bot_get_me'] in done: + session['uid'] = to_monitor['bot_get_me'].result().id + # delete completed tasks + for k in dict(to_monitor): + if to_monitor[k] in done: + del to_monitor[k] + # log + print('[I] Stopping session \'%s\'' % session['name']) + # cancel remaining tasks + for k in dict(to_monitor): + try: + to_monitor[k].cancel() + except: + pass + del to_monitor[k] + + +async def add_session(name: str, config: dict) -> bool: + ''' Start Telegram user session ''' + global sessions + + # sessions already exists + if name in sessions: + return False + # login + login = config['accounts'][name]['login'] + # directory 'sessions' does not exist + if not utils.ensure_dir('sessions'): + print('[!] Can\'t ensure \'sessions\' directory existance!') + return False + # client + client = TelegramClient( + 'sessions/%s' % name, + api_id = config['tg_api_id'], + api_hash = config['tg_api_hash'], + connection_retries=None, + auto_reconnect=True + ) + client.session_name = name + client.add_event_handler(_cb_new_message, events.NewMessage) + + # session info + session = { + 'uid': None, + 'config': config, + 'name': name, + 'login': login, + 'client': client, + 'stop_event': asyncio.Event(), + 'queue': asyncio.Queue(), + 'auth_data': AuthData(login, name), + 'task': None + } + # start the task + session['task'] = asyncio.create_task(_session_task(session)) + sessions[name] = session + # quit + return True + +async def stop_all() -> None: + ''' Stop all sessions ''' + global sessions + # stop all + for k in sessions: + s = sessions[k] + s['stop_event'].set() + await s['task'] + # cleanup + sessions = [] + +async def provide_auth_data(account_name: str, data: str) -> bool: + ''' Provide authentication data for account ''' + # no such account + if account_name not in sessions: + return False + # provide + return await sessions[account_name]['auth_data'].provide_data(data) + +def set_mods(mods: dict) -> None: + ''' Set dict of available mods. ''' + global _mods + _mods = dict(mods) + +def unset_mods() -> None: + ''' Remove available mods. ''' + global _mods + _mods = {} diff --git a/main.py b/main.py new file mode 100644 index 0000000..83b5609 --- /dev/null +++ b/main.py @@ -0,0 +1,88 @@ +''' Application Entry Point ''' + +import importlib +import asyncio +import signal +import os + +import utils +import robot +import hubot +import config + +# get list of mods +_mods = {} +# import all mods +for m in utils.get_all_mods(): + _mods[m] = importlib.import_module(m) + +# application config +_config = None + +# task of termination routine +_termination_task = None +# termination event +_termination_event = None + +async def set_termination_event() -> None: + ''' Just set the termination event ''' + _termination_event.set() + +def signal_handler() -> None: + ''' Start termination task ''' + global _termination_task + print() + + # already terminating - suicide + if _termination_task is not None: + print('[!] Suiciding!') + os.kill(os.getpid(), signal.SIGKILL) + # not suicide yet + print('[I] Trying to terminate gracefully...') + _termination_task = asyncio.ensure_future(set_termination_event()) + + +async def main() -> None: + global _config, _termination_event + print('[I] tg-utility') + + # create termination event + _termination_event = asyncio.Event() + + # setup the signal handlers + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, signal_handler) + loop.add_signal_handler(signal.SIGTERM, signal_handler) + + # load the config + _config = config.read_config('config.json') + if type(_config) is not dict: + print('[!] Invalid config, can\'t continue') + return + + # start the bot + await robot.start(_config) + # start all hubots + for name in _config['accounts']: + await hubot.add_session(name, _config) + # initialize all mods + for mod in _mods: + await _mods[mod].mod_init(_config) + # set available mods in hubot + hubot.set_mods(_mods) + + # wait for termination event to happen + await _termination_event.wait() + + # unset available mods in hubot + hubot.unset_mods() + # deinitialize all mods + for mod in _mods: + await _mods[mod].mod_deinit() + # stop all sessions + await hubot.stop_all() + # stop the bot + await robot.stop() + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/mod_basic.py b/mod_basic.py new file mode 100644 index 0000000..0b88ce4 --- /dev/null +++ b/mod_basic.py @@ -0,0 +1,55 @@ +''' Basic functions mod ''' + +import asyncio + +from telethon import events +from telethon.tl.types import PeerUser + +import utils + +_config = None + +async def mod_init(config: dict) -> bool: + ''' Initialize the mod ''' + global _config + _config = config + print('[I] mod_basic is initialized') + +async def mod_deinit() -> None: + ''' Deinitialize the mod ''' + print('[I] mod_basic is deinitialized') + +def mod_get_mighty() -> bool: + ''' Mod is called 'mighty' if it receives all messages ''' + return False + +def mod_get_tags() -> None: + ''' Get tags used by the mod ''' + return ['base'] + +async def mod_new_message(session, event) -> None: + ''' Handle new message ''' + try: + # get the message + msg = event.message + # not outgoing - do not process + if not msg.out: + return + # peer must be user + peer = msg.peer_id + if type(peer) is not PeerUser: + return + + # get the text + text = msg.message + # get args + args = [i for i in text.split(' ') if i][1:] + # no args + if not args: + response_text = 'tg-utility available mods:' + mods = utils.get_all_mods() + for mod in mods: + response_text += '\n - %s' % mod + await event.reply(message=response_text) + except: + utils.pex() diff --git a/mod_video_downloader.py b/mod_video_downloader.py new file mode 100644 index 0000000..03003ff --- /dev/null +++ b/mod_video_downloader.py @@ -0,0 +1,404 @@ +''' mod_video_downloader for tg-utility ''' + +import json +import time +import select +import shutil +import asyncio +import traceback +import subprocess +import os + +from telethon import events +from telethon.tl.types import PeerUser +from telethon.tl.types import DocumentAttributeVideo + +import utils + +# application config +_config = None +# cached video qualities +_cached_qualities = { + '03dcdde34c18d6128cf906a10365f014': { + 'url': 'https://www.pornhub.com/view_video.php?viewkey=65edb3a6aede0', + 'format': '240p', + 'ext': 'mp4' + } +} + + +def _get_all_qualities_raw(url: str, proxy: bool, timeout: float = 20) -> dict | str: + ''' Get all video qualities as dict ''' + try: + # prepare arguments + args = [ + utils.which('youtube-dl'), + '-J', + url + ] + if proxy: + args = [utils.which('proxychains4')] + args + # start the process + cp = subprocess.run( + args, + capture_output=True, + timeout=timeout + ) + # check the result + txt = cp.stdout.decode(encoding='utf-8') + try: + j = json.loads(txt) + except: + return cp.stderr.decode(encoding='ascii') + # dict to return + res = {} + # one format only - convert to many + if 'formats' not in j: + j['formats'] = [{ + 'format_id': j['format_id'], + 'ext': j['ext'] + }] + # check all formats + for f in j['formats']: + obj = { + 'url': url, + 'format': f['format_id'], + 'ext': f['ext'], + '_added': time.time() + } + cache_id = utils.get_md5('\n'.join([str(obj[i]) for i in obj if i[0] != '_'])) + _cached_qualities[cache_id] = obj + res[cache_id] = obj + return res + except subprocess.TimeoutExpired: + return 'Request timed out' + except: + return traceback.format_exc() + +async def _get_all_qualities(url: str, proxy: bool, timeout: float = 20) -> dict | str: + ''' Async version ''' + try: + return await asyncio.to_thread(_get_all_qualities_raw, url, proxy, timeout) + except: + return traceback.format_exc() + +def _download_video_raw(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool: + ''' Download video from URL, use quality code to get info from cache ''' + try: + # no such quality cached + if quality_code not in _cached_qualities: + return 'Not found quality for specified CODE!' + # get data + data = _cached_qualities[quality_code] + + # prepare arguments + args = [ + utils.which('youtube-dl'), + '--newline', + '-f', + data['format'], + '-o', + path, + url + ] + if proxy: + args = [utils.which('proxychains4')] + args + # start the process + cp = subprocess.Popen( + args, + bufsize=1, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + universal_newlines=True + ) + # check output every second + last_update_time = time.time() + last_progress = '' + while True: + lines = [] + ready = True + while ready: + # check if terminated + if cp.poll() is not None: + # success + if os.path.isfile(path): + return True + # failure + else: + return 'youtube-dl has exited but output file does not exist' + break + ready, _, _ = select.select([cp.stdout], [], [], 0.1) + # data available + if ready: + l = cp.stdout.readline().strip() + # check if line starts with [download] + if not l.startswith('[download] '): + continue + # split + parts = [i for i in l.replace('[download]', '').split(' ') if i] + # no percent + if '%' not in parts[0]: + continue + # yes percent - that's progress + new_progress = parts[0] + # change + if new_progress != last_progress: + last_progress = new_progress + last_update_time = time.time() + time.sleep(1) + # timed out + if time.time() - last_update_time >= timeout: + return "Timed out" + except: + return traceback.format_exc() + +async def _download_video(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool: + ''' Async version ''' + try: + return await asyncio.to_thread(_download_video_raw, url, quality_code, path, proxy, timeout) + except: + return traceback.format_exc() + +def _get_video_data_raw(path: str) -> dict | None: + ''' Get video duration, width, height and file size ''' + try: + # prepare arguments + args = [ + utils.which('ffprobe'), + '-v', + 'error', + '-select_streams', + 'v:0', + '-show_entries', + 'stream=width,height,duration', + '-show_entries', + 'format=size', + '-of', + 'default=noprint_wrappers=1:nokey=1', + path + ] + # start the process + cp = subprocess.run( + args, + capture_output=True, + timeout=10 + ) + # check the result + txt = None + try: + txt = cp.stdout.decode(encoding='ascii').strip() + except: + return None + parts = [i.strip() for i in txt.split('\n') if i.strip()] + # result + return { + 'width': int(parts[0]), + 'height': int(parts[1]), + 'duration': int(float(parts[2])), + 'size': int(parts[3]) + } + except subprocess.TimeoutExpired: + return 'ffprobe timed out' + except: + return traceback.format_exc() + +async def _get_video_data(path: str) -> dict | None: + ''' Async version ''' + try: + return await asyncio.to_thread(_get_video_data_raw, path) + except: + return traceback.format_exc() + +def _generate_thumb_raw(video: str, timestamp: int, thumb: str) -> bool: + ''' Generates a thumbnail for Telegram ''' + try: + # prepare arguments + args = [ + utils.which('ffmpeg'), + '-ss', + str(timestamp), + '-i', + video, + '-vf', + 'thumbnail,scale=\'min(320,iw)\':\'min(320,ih)\':force_original_aspect_ratio=decrease', + '-frames:v', + '1', + thumb + ] + # start the process + cp = subprocess.run( + args, + capture_output=True, + timeout=10 + ) + # check the result + txt = None + try: + txt = cp.stdout.decode(encoding='ascii').strip() + txt = cp.stderr.decode(encoding='ascii').strip() + except: + pass + parts = [i.strip() for i in txt.split('\n') if i.strip()] + # result + return os.path.isfile(thumb) + except subprocess.TimeoutExpired: + return 'ffprobe timed out' + except: + return traceback.format_exc() + return False + +async def _generate_thumb(video: str, timestamp: int, thumb: str) -> dict | None: + ''' Async version ''' + try: + return await asyncio.to_thread(_generate_thumb_raw, video, timestamp, thumb) + except: + return traceback.format_exc() + + +async def mod_init(config: dict) -> bool: + ''' Initialize the mod ''' + global _config + _config = config + # delete old temp + try: + shutil.rmtree('mvd_temp') + pass + except: + pass + utils.ensure_dir('mvd_temp') + print('[I] mod_video_downloader is initialized') + +async def mod_deinit() -> None: + ''' Deinitialize the mod ''' + # delete temp + try: + shutil.rmtree('mvd_temp') + pass + except: + pass + print('[I] mod_video_downloader is deinitialized') + +def mod_get_mighty() -> bool: + ''' Mod is called 'mighty' if it receives all messages ''' + return False + +def mod_get_tags() -> None: + ''' Get tags used by the mod ''' + return ['mvd', 'mvdl', 'mvdlp', 'mvdd', 'mvddp'] + +async def mod_new_message(session, event) -> None: + ''' Handle new message ''' + try: + # get the message + msg = event.message + # not outgoing - do not process + if not msg.out: + return + # peer must be user + peer = msg.peer_id + if type(peer) is not PeerUser: + return + + # get the text + text = msg.message + # get args + args = [i for i in text.split(' ') if i] + cmd = args[0].lower() + args = args[1:] + await asyncio.sleep(0.5) + # help + if cmd == 'mvd': + response_text = 'mod_video_downloader:' + response_text += '\n- mvdl[p] [URL] - get list of all video qualities' + response_text += '\n- mvdd[p] [CODE] - download video' + response_text += '\n\nUse \'p\' letter to utilize proxy' + await event.reply(message=response_text) + # list qualities + elif cmd.startswith('mvdl'): + if not args: + await event.reply(message='No URL!') + return + await event.reply(message='Checking URL... Please wait, you\'ll be notified if an error happens!') + qualities = await _get_all_qualities(' '.join(args), cmd[-1] == 'p') + # error + if type(qualities) is str: + await event.reply(message='Error:\n\n%s' % qualities) + return + # success + result = 'Qualities:' + for qid in qualities: + data = qualities[qid] + result += '\n\nmvdd%s %s' % ('p' if cmd[-1] == 'p' else '', qid) + result += '\n- Format: %s' % data['format'] + result += '\n- Extension: %s' % data['ext'] + await event.reply(message=result, parse_mode='HTML') + # download + elif cmd.startswith('mvdd'): + if not args: + await event.reply(message='No CODE!') + return + # get the code and check it + code = args[-1] + if code not in _cached_qualities: + await event.reply(message='This code does not exist. Use \'mvdl[p]\' to obtain the code.') + return + # get video data + data = _cached_qualities[code] + await event.reply(message='Downloading the video... Please wait, you\'ll be notified if an error happens!') + res = await _download_video(data['url'], code, 'mvd_temp/%s.bin' % code, cmd[-1] == 'p') + # res is str - error + if type(res) is str: + utils.rm_glob('mvd_temp/%s.*' % code) + await event.reply(message='Failed to download video: %s' % res) + return + # res is false + if not res: + utils.rm_glob('mvd_temp/%s.*' % code) + await event.reply(message='Something went wrong during downloading...' % res) + return + # old name and new name + old_name = 'mvd_temp/%s.bin' % code + new_name = 'mvd_temp/%s.%s' % (code, data['ext']) + # rename + try: + shutil.move(old_name, new_name) + pass + except: + utils.rm_glob('mvd_temp/%s.*' % code) + await event.reply(message='Failed to rename downloaded video') + return + # get video data + video_data = await _get_video_data(new_name) + if type(video_data) is not dict: + utils.rm_glob('mvd_temp/%s.*' % code) + await event.reply(message='Failed to use \'ffprobe\' to get video data') + return + # generate the thumbnail + thumb_name = 'mvd_temp/%s.jpg' % code + if not await _generate_thumb(new_name, int(video_data['duration'] * 0.75), thumb_name): + utils.rm_glob('mvd_temp/%s.*' % code) + await event.reply(message='Failed to generate video thumbnail') + return + # log + await event.reply(message='Video is downloaded, thumbnail is generated, uploading it to Telegram...') + # send file + try: + await event.client.send_file( + entity=peer, + file=new_name, + caption='%s' % data['url'], + mime_type=utils.get_mime(data['ext']), + file_size=video_data['size'], + thumb=thumb_name, + supports_streaming=True, + attributes=[DocumentAttributeVideo( + duration=video_data['duration'], + w=video_data['width'], + h=video_data['height'] + )] + ) + except: + await event.reply(message='Failed to upload video to telegram!') + utils.rm_glob('mvd_temp/%s.*' % code) + except: + utils.pex() diff --git a/robot.py b/robot.py new file mode 100644 index 0000000..c166029 --- /dev/null +++ b/robot.py @@ -0,0 +1,172 @@ +''' Management bot on Telegram ''' + +import html + +from telegram import Bot, Update +from telegram.ext import ApplicationBuilder, Application +from telegram.ext import MessageHandler, CommandHandler +import telegram.ext.filters as filters + +import hubot + + +# application config +_config : dict = None +# application +_app : Application = None +# bot username +_username : str = None + +async def cmd_auth_handler(update, context) -> None: + ''' Callback for /auth ''' + # objs + msg = update.message + uid = msg.chat.id + # not admin + if uid != _config['tg_admin_uid']: + await msg.reply_text('Permission denied') + return + # get args + args = context.args + if len(args) != 2: + await msg.reply_text('Must have two args: ACCOUNT_NAME and AUTH_DATA') + return + # account and data + account_name = args[0] + data = args[1] + # + res = await hubot.provide_auth_data(account_name, data) + if res: + await msg.reply_text('Authentication data is provided') + else: + await msg.reply_text('Failed to provide authentication data') + +async def cmd_accs_handler(update, context) -> None: + ''' Callback for /accs ''' + # objs + msg = update.message + uid = msg.chat.id + # not admin + if uid != _config['tg_admin_uid']: + await msg.reply_text('Permission denied') + return + # prepare + accs = _config['accounts'] + text = 'Accounts:' + # empty + if not accs: + text += '\n\nNo accounts are added' + await msg.reply_text(text, parse_mode='HTML') + return + # serialize + for n in accs: + a = accs[n] + text += '\n\nAccount \'%s\'' % html.escape(n) + text += '\nLogin: %s' % a['login'] + text += '\nEnabled mods:' + mods = [k for k in a if k.startswith('mod_') and a[k]] + if not mods: + text += '\n no mods are enabled' + else: + for m in mods: + text += '\n - %s' % m + # reply + await msg.reply_text(text, parse_mode='HTML') + + +async def text_handler(update, context) -> None: + ''' New message callback ''' + msg = update.message + uid = msg.chat.id + await msg.reply_text('ok text') + +async def start(config: dict) -> bool: + ''' Start management bot ''' + global _config, _app, _username + # app already exists + if _app is not None: + return False + # save the config + _config = config + # create the application + _app = ApplicationBuilder().token(_config['management_token']).build() + + # add handlers + _app.add_handler(CommandHandler('auth', cmd_auth_handler)) + _app.add_handler(CommandHandler('accs', cmd_accs_handler)) + #_app.add_handler(MessageHandler(filters.TEXT, text_handler), 100) + + # initialize it + try: + await _app.initialize() + except: + _app = None + print('[I] Can\'t initialize robot Application') + return False + + # start polling + try: + await _app.updater.start_polling() + except: + await _app.shutdown() + _app = None + print('[!] Can\'t start robot polling') + return False + + # start app + try: + await _app.start() + except: + await _app.updater.stop() + await _app.shutdown() + _app = None + print('[!] Can\'t start robot Application') + return False + + # get our username + try: + me = await _app.bot.get_me() + _username = f'@{me.username}' + except: + _username = '@???' + + # log + print('[I] Robot (%s) is started' % _username) + return True + +async def stop() -> bool: + ''' Stop management bot ''' + global _app + # not started + if _app is None: + return True + # stop + try: + await _app.updater.stop() + await _app.stop() + await _app.shutdown() + except: + pass + # log + print('[I] Robot is stopped') + return True + +async def send_to_admin(text: str, parse_mode: str = None) -> bool: + ''' Send text message to admin ''' + return await send_to(_config['tg_admin_uid'], text, parse_mode) + +async def send_to(uid: int, text : str, parse_mode: str = None) -> bool: + ''' Send text message to chat ''' + # no bot + if _app is None: + return False + # send the message + try: + await _app.bot.send_message( + uid, + text, + parse_mode=parse_mode + ) + return True + except: + return False diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..f279dc4 --- /dev/null +++ b/utils.py @@ -0,0 +1,71 @@ +''' Utilities ''' + +import os +import time +import glob +import shutil +import hashlib +import mimetypes +import traceback + +def pex() -> None: + ''' Print last exception ''' + traceback.print_exc() + +def ensure_dir(path: str) -> bool: + ''' Ensure directory existance. + 'path' must NOT have trailing slash! + ''' + try: + os.makedirs(path, exist_ok=True) + return True + except: + return False + +def get_script_dir() -> str: + ''' Returns path of this script (utils.py) ''' + return os.path.dirname(os.path.abspath(__file__)) + +def get_all_mods() -> list[str]: + ''' Get list of all supported mods ''' + sd = get_script_dir() + res = [f for f in os.listdir(sd) if os.path.isfile(os.path.join(sd, f))] + res = [f.split('.')[0] for f in res if f.startswith('mod_') and f.endswith('.py')] + return res + +def get_md5(data: str) -> str: + ''' Returns MD5 for data ''' + md5_hash = hashlib.md5() + md5_hash.update(str(data).encode('ascii')) + return md5_hash.hexdigest() + + +def get_unique_md5() -> str: + ''' Returns unique MD5 ''' + md5_hash = hashlib.md5() + md5_hash.update(str(time.time()).encode('ascii')) + return md5_hash.hexdigest() + +def which(cmd: str) -> str: + ''' Analogue to UNIX which ''' + return shutil.which(cmd) + +def get_mime(ext) -> str: + ''' ext must not start with dot ''' + try: + mime_type, _ = mimetypes.guess_type('file.%s' % ext) + return mime_type + except: + return 'video/%s' % ext + +def rm_glob(path_glob: str) -> None: + ''' Delete files using glob (files only) ''' + try: + files = glob.glob(path_glob) + for f in files: + try: + os.remove(f) + except: + pass + except: + pass