Initial commit

This commit is contained in:
Nikita Tyukalov, ASUS, Linux
2025-11-22 23:39:39 +03:00
commit 40385167e3
8 changed files with 1136 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
__pycache__
sessions
*_temp
.*.swp
.*.swo
config.json

107
config.py Normal file
View File

@@ -0,0 +1,107 @@
''' Config reader '''
import json
def read_config(path: str) -> dict | None:
''' Read config '''
default = {
'management_token': 'TELEGRAM_BOT_TOKEN_HERE',
'tg_api_id': 12345,
'tg_api_hash': '0123456789abcdef0123456789abcdef',
'accounts': {
'account1': {
'login': '+71234567890',
'mod_basic': True,
'mod_eternal_online': False,
'mod_video_downloader': True
}
}
}
# root fields
root_required_fields = {
'management_token': [str],
'tg_api_id': [int],
'tg_api_hash': [str],
'tg_admin_uid': [int],
'accounts': [dict]
}
# account fields
required_fields = {
'login': [str]
}
optional_fields = {
'mod_basic': ([bool], True),
'mod_eternal_online': ([bool], False),
'mod_video_downloader': ([bool], False)
}
try:
j = None
with open(path, 'r') as f:
j = json.loads(f.read())
# root fields check
for key in root_required_fields:
if key not in j:
print('[!] Config misses key \'%s\'' % key)
return None
if type(j[key]) not in root_required_fields[key]:
print('[!] Config has key \'%s\' of invalid type: must be one of %s' % (key, root_required_fields[key]))
return None
# check all accounts
for account_name in dict(j['accounts']):
# to remove the account?
to_remove = False
# save the account with short name
acc = j['accounts'][account_name]
# check if all required fields are present
for key in required_fields:
# missing field
if key not in acc:
to_remove = True
print('[!] Account \'%s\' misses required field \'%s\'' % (account_name, key))
break
# invalid type
t = type(acc[key])
if t not in required_fields[key]:
to_remove = True
print('[!] Account \'%s\' has required field of invalid type: \'%s\' must be one of %s' % (account_name, key, required_fields[key]))
break
# remove the account
if to_remove:
print('[!] Account \'%s\' will not be served until errors are fixed' % account_name)
del j[account_name]
continue
# check all optional fields
for key in optional_fields:
# get allowed types and default value
allowed_types, default_value = optional_fields[key]
# missing - add default value
if key not in acc:
print(
'[W] Account \'%s\' misses optional field \'%s\', using default value \'%s\'' % \
(account_name, key, default_value)
)
acc[key] = default_value
continue
# invalid type
t = type(acc[key])
if t not in allowed_types:
print(
'[W] Account \'%s\' has optional field of invalid type: \'%s\' must be one of %s. Using default value %s' % \
(account_name, key, types, default_value)
)
acc[key] = default_value
continue
# log
print('[I] Added account \'%s\' (login \'%s\')' % (account_name, acc['login']))
# return
return j
except:
try:
with open(path, 'w') as f:
f.write(json.dumps(default, indent=4))
print('[!] Saved default config to %s' % path)
except:
print('[!] Failed to save default config to %s' % path)
return None

233
hubot.py Normal file
View File

@@ -0,0 +1,233 @@
''' HUman BOT '''
import asyncio
from telethon import TelegramClient, events
import html
import robot
import utils
# active sessions
sessions = {}
# available mods
_mods = {}
class AuthData:
def __init__(self, phone: str, account_name: str):
self.name = account_name
self.phone = phone
self.q = asyncio.Queue()
self.waiting = False
async def get_phone(self) -> str:
''' Workaround.
It is required, because TelegramClient will treat get_password and get_code
as coroutines only if coroutine is passed for phone number.
'''
return self.phone
async def __get_data(self, data_name: str) -> str | None:
''' Get data using Telegram management bot '''
# already waiting
if self.waiting:
await asyncio.sleep(1)
return None
# waiting now
self.waiting = True
# request
try:
# log
print('[I] Requesting %s for account \'%s\'...' % (data_name, self.name))
# send message to admin
text = '<b>Session %s requires %s!</b>' % (html.escape(self.name), data_name)
text += '\nSend it like this:'
text += '\n\n<code>/auth %s [%s HERE]</code>' % (html.escape(self.name), data_name.upper())
text += '\n\nBe aware that Telegram has protection' \
+ ' which prohibits sharing your credentials via Telegram.' \
+ ' You have to put underscore as second symbol in order' \
+ ' to evade this protection. For example, <code>12345</code>' \
+ ' becomes <code>1_2345</code>.'
await robot.send_to_admin(text, parse_mode='HTML')
# wait for data
data = await self.q.get()
# remove second symbol
data = data.strip()
data = data[0] + data[2:]
self.waiting = False
return data
except:
self.waiting = False
return None
async def get_password(self) -> str | None:
''' Get password using Telegram bot '''
return await self.__get_data('password')
async def get_code(self):
''' Get code using Telegram bot '''
return await self.__get_data('OTP')
async def provide_data(self, data: str) -> bool:
''' Provide data from other tasks '''
if not self.waiting:
return False
self.waiting = False
await self.q.put(data)
return True
async def _cb_new_message(event) -> None:
''' Handle new message '''
# get the client
client = event.client
name = client.session_name
# get message text
msg_text = None
msg_word = None
try:
m = event.message
msg_text = m.message
msg_word = msg_text.split(' ')[0].lower()
except:
pass
# tasks
tasks = []
# pass the event to all mods interested in it
for mod_name in _mods:
# mod
mod = _mods[mod_name]
try:
# the mod matches the message?
if mod.mod_get_mighty() or msg_word in mod.mod_get_tags():
tasks.append(
asyncio.create_task(
mod.mod_new_message(sessions[name], event)
)
)
except:
utils.pex()
# wait for all tasks to complete
while tasks:
done, pending = await asyncio.wait(tasks)
tasks = pending
async def _session_task(session: dict) -> None:
''' Task that manages the session '''
# log
print('[I] Started session \'%s\'' % session['name'])
# client
client = session['client']
# tasks to monitor
to_monitor = {}
to_monitor['stop'] = asyncio.create_task(session['stop_event'].wait())
to_monitor['bot_start'] = asyncio.create_task(
client.start(
session['auth_data'].get_phone,
password=session['auth_data'].get_password,
code_callback=session['auth_data'].get_code,
)
)
# whether to continue work
to_work = True
# session loop
while to_work:
# id is unknown and we are authorized
if 'bot_start' not in to_monitor and 'bot_get_me' not in to_monitor and session['uid'] is None:
to_monitor['bot_get_me'] = asyncio.create_task(
client.get_me()
)
# what to wait for
aws = [to_monitor[k] for k in to_monitor]
# wait
done, pending = await asyncio.wait(aws, return_when=asyncio.FIRST_COMPLETED)
# stop
if to_monitor['stop'] in done:
to_work = False
# get_me
if 'bot_get_me' in to_monitor and to_monitor['bot_get_me'] in done:
session['uid'] = to_monitor['bot_get_me'].result().id
# delete completed tasks
for k in dict(to_monitor):
if to_monitor[k] in done:
del to_monitor[k]
# log
print('[I] Stopping session \'%s\'' % session['name'])
# cancel remaining tasks
for k in dict(to_monitor):
try:
to_monitor[k].cancel()
except:
pass
del to_monitor[k]
async def add_session(name: str, config: dict) -> bool:
''' Start Telegram user session '''
global sessions
# sessions already exists
if name in sessions:
return False
# login
login = config['accounts'][name]['login']
# directory 'sessions' does not exist
if not utils.ensure_dir('sessions'):
print('[!] Can\'t ensure \'sessions\' directory existance!')
return False
# client
client = TelegramClient(
'sessions/%s' % name,
api_id = config['tg_api_id'],
api_hash = config['tg_api_hash'],
connection_retries=None,
auto_reconnect=True
)
client.session_name = name
client.add_event_handler(_cb_new_message, events.NewMessage)
# session info
session = {
'uid': None,
'config': config,
'name': name,
'login': login,
'client': client,
'stop_event': asyncio.Event(),
'queue': asyncio.Queue(),
'auth_data': AuthData(login, name),
'task': None
}
# start the task
session['task'] = asyncio.create_task(_session_task(session))
sessions[name] = session
# quit
return True
async def stop_all() -> None:
''' Stop all sessions '''
global sessions
# stop all
for k in sessions:
s = sessions[k]
s['stop_event'].set()
await s['task']
# cleanup
sessions = []
async def provide_auth_data(account_name: str, data: str) -> bool:
''' Provide authentication data for account '''
# no such account
if account_name not in sessions:
return False
# provide
return await sessions[account_name]['auth_data'].provide_data(data)
def set_mods(mods: dict) -> None:
''' Set dict of available mods. '''
global _mods
_mods = dict(mods)
def unset_mods() -> None:
''' Remove available mods. '''
global _mods
_mods = {}

88
main.py Normal file
View File

@@ -0,0 +1,88 @@
''' Application Entry Point '''
import importlib
import asyncio
import signal
import os
import utils
import robot
import hubot
import config
# get list of mods
_mods = {}
# import all mods
for m in utils.get_all_mods():
_mods[m] = importlib.import_module(m)
# application config
_config = None
# task of termination routine
_termination_task = None
# termination event
_termination_event = None
async def set_termination_event() -> None:
''' Just set the termination event '''
_termination_event.set()
def signal_handler() -> None:
''' Start termination task '''
global _termination_task
print()
# already terminating - suicide
if _termination_task is not None:
print('[!] Suiciding!')
os.kill(os.getpid(), signal.SIGKILL)
# not suicide yet
print('[I] Trying to terminate gracefully...')
_termination_task = asyncio.ensure_future(set_termination_event())
async def main() -> None:
global _config, _termination_event
print('[I] tg-utility')
# create termination event
_termination_event = asyncio.Event()
# setup the signal handlers
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, signal_handler)
loop.add_signal_handler(signal.SIGTERM, signal_handler)
# load the config
_config = config.read_config('config.json')
if type(_config) is not dict:
print('[!] Invalid config, can\'t continue')
return
# start the bot
await robot.start(_config)
# start all hubots
for name in _config['accounts']:
await hubot.add_session(name, _config)
# initialize all mods
for mod in _mods:
await _mods[mod].mod_init(_config)
# set available mods in hubot
hubot.set_mods(_mods)
# wait for termination event to happen
await _termination_event.wait()
# unset available mods in hubot
hubot.unset_mods()
# deinitialize all mods
for mod in _mods:
await _mods[mod].mod_deinit()
# stop all sessions
await hubot.stop_all()
# stop the bot
await robot.stop()
if __name__ == '__main__':
asyncio.run(main())

55
mod_basic.py Normal file
View File

@@ -0,0 +1,55 @@
''' Basic functions mod '''
import asyncio
from telethon import events
from telethon.tl.types import PeerUser
import utils
_config = None
async def mod_init(config: dict) -> bool:
''' Initialize the mod '''
global _config
_config = config
print('[I] mod_basic is initialized')
async def mod_deinit() -> None:
''' Deinitialize the mod '''
print('[I] mod_basic is deinitialized')
def mod_get_mighty() -> bool:
''' Mod is called 'mighty' if it receives all messages '''
return False
def mod_get_tags() -> None:
''' Get tags used by the mod '''
return ['base']
async def mod_new_message(session, event) -> None:
''' Handle new message '''
try:
# get the message
msg = event.message
# not outgoing - do not process
if not msg.out:
return
# peer must be user
peer = msg.peer_id
if type(peer) is not PeerUser:
return
# get the text
text = msg.message
# get args
args = [i for i in text.split(' ') if i][1:]
# no args
if not args:
response_text = 'tg-utility available mods:'
mods = utils.get_all_mods()
for mod in mods:
response_text += '\n - %s' % mod
await event.reply(message=response_text)
except:
utils.pex()

404
mod_video_downloader.py Normal file
View File

@@ -0,0 +1,404 @@
''' mod_video_downloader for tg-utility '''
import json
import time
import select
import shutil
import asyncio
import traceback
import subprocess
import os
from telethon import events
from telethon.tl.types import PeerUser
from telethon.tl.types import DocumentAttributeVideo
import utils
# application config
_config = None
# cached video qualities
_cached_qualities = {
'03dcdde34c18d6128cf906a10365f014': {
'url': 'https://www.pornhub.com/view_video.php?viewkey=65edb3a6aede0',
'format': '240p',
'ext': 'mp4'
}
}
def _get_all_qualities_raw(url: str, proxy: bool, timeout: float = 20) -> dict | str:
''' Get all video qualities as dict '''
try:
# prepare arguments
args = [
utils.which('youtube-dl'),
'-J',
url
]
if proxy:
args = [utils.which('proxychains4')] + args
# start the process
cp = subprocess.run(
args,
capture_output=True,
timeout=timeout
)
# check the result
txt = cp.stdout.decode(encoding='utf-8')
try:
j = json.loads(txt)
except:
return cp.stderr.decode(encoding='ascii')
# dict to return
res = {}
# one format only - convert to many
if 'formats' not in j:
j['formats'] = [{
'format_id': j['format_id'],
'ext': j['ext']
}]
# check all formats
for f in j['formats']:
obj = {
'url': url,
'format': f['format_id'],
'ext': f['ext'],
'_added': time.time()
}
cache_id = utils.get_md5('\n'.join([str(obj[i]) for i in obj if i[0] != '_']))
_cached_qualities[cache_id] = obj
res[cache_id] = obj
return res
except subprocess.TimeoutExpired:
return 'Request timed out'
except:
return traceback.format_exc()
async def _get_all_qualities(url: str, proxy: bool, timeout: float = 20) -> dict | str:
''' Async version '''
try:
return await asyncio.to_thread(_get_all_qualities_raw, url, proxy, timeout)
except:
return traceback.format_exc()
def _download_video_raw(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool:
''' Download video from URL, use quality code to get info from cache '''
try:
# no such quality cached
if quality_code not in _cached_qualities:
return 'Not found quality for specified CODE!'
# get data
data = _cached_qualities[quality_code]
# prepare arguments
args = [
utils.which('youtube-dl'),
'--newline',
'-f',
data['format'],
'-o',
path,
url
]
if proxy:
args = [utils.which('proxychains4')] + args
# start the process
cp = subprocess.Popen(
args,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True
)
# check output every second
last_update_time = time.time()
last_progress = ''
while True:
lines = []
ready = True
while ready:
# check if terminated
if cp.poll() is not None:
# success
if os.path.isfile(path):
return True
# failure
else:
return 'youtube-dl has exited but output file does not exist'
break
ready, _, _ = select.select([cp.stdout], [], [], 0.1)
# data available
if ready:
l = cp.stdout.readline().strip()
# check if line starts with [download]
if not l.startswith('[download] '):
continue
# split
parts = [i for i in l.replace('[download]', '').split(' ') if i]
# no percent
if '%' not in parts[0]:
continue
# yes percent - that's progress
new_progress = parts[0]
# change
if new_progress != last_progress:
last_progress = new_progress
last_update_time = time.time()
time.sleep(1)
# timed out
if time.time() - last_update_time >= timeout:
return "Timed out"
except:
return traceback.format_exc()
async def _download_video(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool:
''' Async version '''
try:
return await asyncio.to_thread(_download_video_raw, url, quality_code, path, proxy, timeout)
except:
return traceback.format_exc()
def _get_video_data_raw(path: str) -> dict | None:
''' Get video duration, width, height and file size '''
try:
# prepare arguments
args = [
utils.which('ffprobe'),
'-v',
'error',
'-select_streams',
'v:0',
'-show_entries',
'stream=width,height,duration',
'-show_entries',
'format=size',
'-of',
'default=noprint_wrappers=1:nokey=1',
path
]
# start the process
cp = subprocess.run(
args,
capture_output=True,
timeout=10
)
# check the result
txt = None
try:
txt = cp.stdout.decode(encoding='ascii').strip()
except:
return None
parts = [i.strip() for i in txt.split('\n') if i.strip()]
# result
return {
'width': int(parts[0]),
'height': int(parts[1]),
'duration': int(float(parts[2])),
'size': int(parts[3])
}
except subprocess.TimeoutExpired:
return 'ffprobe timed out'
except:
return traceback.format_exc()
async def _get_video_data(path: str) -> dict | None:
''' Async version '''
try:
return await asyncio.to_thread(_get_video_data_raw, path)
except:
return traceback.format_exc()
def _generate_thumb_raw(video: str, timestamp: int, thumb: str) -> bool:
''' Generates a thumbnail for Telegram '''
try:
# prepare arguments
args = [
utils.which('ffmpeg'),
'-ss',
str(timestamp),
'-i',
video,
'-vf',
'thumbnail,scale=\'min(320,iw)\':\'min(320,ih)\':force_original_aspect_ratio=decrease',
'-frames:v',
'1',
thumb
]
# start the process
cp = subprocess.run(
args,
capture_output=True,
timeout=10
)
# check the result
txt = None
try:
txt = cp.stdout.decode(encoding='ascii').strip()
txt = cp.stderr.decode(encoding='ascii').strip()
except:
pass
parts = [i.strip() for i in txt.split('\n') if i.strip()]
# result
return os.path.isfile(thumb)
except subprocess.TimeoutExpired:
return 'ffprobe timed out'
except:
return traceback.format_exc()
return False
async def _generate_thumb(video: str, timestamp: int, thumb: str) -> dict | None:
''' Async version '''
try:
return await asyncio.to_thread(_generate_thumb_raw, video, timestamp, thumb)
except:
return traceback.format_exc()
async def mod_init(config: dict) -> bool:
''' Initialize the mod '''
global _config
_config = config
# delete old temp
try:
shutil.rmtree('mvd_temp')
pass
except:
pass
utils.ensure_dir('mvd_temp')
print('[I] mod_video_downloader is initialized')
async def mod_deinit() -> None:
''' Deinitialize the mod '''
# delete temp
try:
shutil.rmtree('mvd_temp')
pass
except:
pass
print('[I] mod_video_downloader is deinitialized')
def mod_get_mighty() -> bool:
''' Mod is called 'mighty' if it receives all messages '''
return False
def mod_get_tags() -> None:
''' Get tags used by the mod '''
return ['mvd', 'mvdl', 'mvdlp', 'mvdd', 'mvddp']
async def mod_new_message(session, event) -> None:
''' Handle new message '''
try:
# get the message
msg = event.message
# not outgoing - do not process
if not msg.out:
return
# peer must be user
peer = msg.peer_id
if type(peer) is not PeerUser:
return
# get the text
text = msg.message
# get args
args = [i for i in text.split(' ') if i]
cmd = args[0].lower()
args = args[1:]
await asyncio.sleep(0.5)
# help
if cmd == 'mvd':
response_text = 'mod_video_downloader:'
response_text += '\n- mvdl[p] [URL] - get list of all video qualities'
response_text += '\n- mvdd[p] [CODE] - download video'
response_text += '\n\nUse \'p\' letter to utilize proxy'
await event.reply(message=response_text)
# list qualities
elif cmd.startswith('mvdl'):
if not args:
await event.reply(message='No URL!')
return
await event.reply(message='Checking URL... Please wait, you\'ll be notified if an error happens!')
qualities = await _get_all_qualities(' '.join(args), cmd[-1] == 'p')
# error
if type(qualities) is str:
await event.reply(message='Error:\n\n%s' % qualities)
return
# success
result = 'Qualities:'
for qid in qualities:
data = qualities[qid]
result += '\n\n<code>mvdd%s %s</code>' % ('p' if cmd[-1] == 'p' else '', qid)
result += '\n- Format: %s' % data['format']
result += '\n- Extension: %s' % data['ext']
await event.reply(message=result, parse_mode='HTML')
# download
elif cmd.startswith('mvdd'):
if not args:
await event.reply(message='No CODE!')
return
# get the code and check it
code = args[-1]
if code not in _cached_qualities:
await event.reply(message='This code does not exist. Use \'mvdl[p]\' to obtain the code.')
return
# get video data
data = _cached_qualities[code]
await event.reply(message='Downloading the video... Please wait, you\'ll be notified if an error happens!')
res = await _download_video(data['url'], code, 'mvd_temp/%s.bin' % code, cmd[-1] == 'p')
# res is str - error
if type(res) is str:
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Failed to download video: %s' % res)
return
# res is false
if not res:
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Something went wrong during downloading...' % res)
return
# old name and new name
old_name = 'mvd_temp/%s.bin' % code
new_name = 'mvd_temp/%s.%s' % (code, data['ext'])
# rename
try:
shutil.move(old_name, new_name)
pass
except:
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Failed to rename downloaded video')
return
# get video data
video_data = await _get_video_data(new_name)
if type(video_data) is not dict:
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Failed to use \'ffprobe\' to get video data')
return
# generate the thumbnail
thumb_name = 'mvd_temp/%s.jpg' % code
if not await _generate_thumb(new_name, int(video_data['duration'] * 0.75), thumb_name):
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Failed to generate video thumbnail')
return
# log
await event.reply(message='Video is downloaded, thumbnail is generated, uploading it to Telegram...')
# send file
try:
await event.client.send_file(
entity=peer,
file=new_name,
caption='%s' % data['url'],
mime_type=utils.get_mime(data['ext']),
file_size=video_data['size'],
thumb=thumb_name,
supports_streaming=True,
attributes=[DocumentAttributeVideo(
duration=video_data['duration'],
w=video_data['width'],
h=video_data['height']
)]
)
except:
await event.reply(message='Failed to upload video to telegram!')
utils.rm_glob('mvd_temp/%s.*' % code)
except:
utils.pex()

172
robot.py Normal file
View File

@@ -0,0 +1,172 @@
''' Management bot on Telegram '''
import html
from telegram import Bot, Update
from telegram.ext import ApplicationBuilder, Application
from telegram.ext import MessageHandler, CommandHandler
import telegram.ext.filters as filters
import hubot
# application config
_config : dict = None
# application
_app : Application = None
# bot username
_username : str = None
async def cmd_auth_handler(update, context) -> None:
''' Callback for /auth '''
# objs
msg = update.message
uid = msg.chat.id
# not admin
if uid != _config['tg_admin_uid']:
await msg.reply_text('Permission denied')
return
# get args
args = context.args
if len(args) != 2:
await msg.reply_text('Must have two args: ACCOUNT_NAME and AUTH_DATA')
return
# account and data
account_name = args[0]
data = args[1]
#
res = await hubot.provide_auth_data(account_name, data)
if res:
await msg.reply_text('Authentication data is provided')
else:
await msg.reply_text('Failed to provide authentication data')
async def cmd_accs_handler(update, context) -> None:
''' Callback for /accs '''
# objs
msg = update.message
uid = msg.chat.id
# not admin
if uid != _config['tg_admin_uid']:
await msg.reply_text('Permission denied')
return
# prepare
accs = _config['accounts']
text = '<b>Accounts:</b>'
# empty
if not accs:
text += '\n\n<i>No accounts are added</i>'
await msg.reply_text(text, parse_mode='HTML')
return
# serialize
for n in accs:
a = accs[n]
text += '\n\n<b>Account \'<i>%s</i>\'</b>' % html.escape(n)
text += '\n<b>Login:</b> %s' % a['login']
text += '\n<b>Enabled mods:</b>'
mods = [k for k in a if k.startswith('mod_') and a[k]]
if not mods:
text += '\n <i>no mods are enabled</i>'
else:
for m in mods:
text += '\n - <i>%s</i>' % m
# reply
await msg.reply_text(text, parse_mode='HTML')
async def text_handler(update, context) -> None:
''' New message callback '''
msg = update.message
uid = msg.chat.id
await msg.reply_text('ok text')
async def start(config: dict) -> bool:
''' Start management bot '''
global _config, _app, _username
# app already exists
if _app is not None:
return False
# save the config
_config = config
# create the application
_app = ApplicationBuilder().token(_config['management_token']).build()
# add handlers
_app.add_handler(CommandHandler('auth', cmd_auth_handler))
_app.add_handler(CommandHandler('accs', cmd_accs_handler))
#_app.add_handler(MessageHandler(filters.TEXT, text_handler), 100)
# initialize it
try:
await _app.initialize()
except:
_app = None
print('[I] Can\'t initialize robot Application')
return False
# start polling
try:
await _app.updater.start_polling()
except:
await _app.shutdown()
_app = None
print('[!] Can\'t start robot polling')
return False
# start app
try:
await _app.start()
except:
await _app.updater.stop()
await _app.shutdown()
_app = None
print('[!] Can\'t start robot Application')
return False
# get our username
try:
me = await _app.bot.get_me()
_username = f'@{me.username}'
except:
_username = '@???'
# log
print('[I] Robot (%s) is started' % _username)
return True
async def stop() -> bool:
''' Stop management bot '''
global _app
# not started
if _app is None:
return True
# stop
try:
await _app.updater.stop()
await _app.stop()
await _app.shutdown()
except:
pass
# log
print('[I] Robot is stopped')
return True
async def send_to_admin(text: str, parse_mode: str = None) -> bool:
''' Send text message to admin '''
return await send_to(_config['tg_admin_uid'], text, parse_mode)
async def send_to(uid: int, text : str, parse_mode: str = None) -> bool:
''' Send text message to chat '''
# no bot
if _app is None:
return False
# send the message
try:
await _app.bot.send_message(
uid,
text,
parse_mode=parse_mode
)
return True
except:
return False

71
utils.py Normal file
View File

@@ -0,0 +1,71 @@
''' Utilities '''
import os
import time
import glob
import shutil
import hashlib
import mimetypes
import traceback
def pex() -> None:
''' Print last exception '''
traceback.print_exc()
def ensure_dir(path: str) -> bool:
''' Ensure directory existance.
'path' must NOT have trailing slash!
'''
try:
os.makedirs(path, exist_ok=True)
return True
except:
return False
def get_script_dir() -> str:
''' Returns path of this script (utils.py) '''
return os.path.dirname(os.path.abspath(__file__))
def get_all_mods() -> list[str]:
''' Get list of all supported mods '''
sd = get_script_dir()
res = [f for f in os.listdir(sd) if os.path.isfile(os.path.join(sd, f))]
res = [f.split('.')[0] for f in res if f.startswith('mod_') and f.endswith('.py')]
return res
def get_md5(data: str) -> str:
''' Returns MD5 for data '''
md5_hash = hashlib.md5()
md5_hash.update(str(data).encode('ascii'))
return md5_hash.hexdigest()
def get_unique_md5() -> str:
''' Returns unique MD5 '''
md5_hash = hashlib.md5()
md5_hash.update(str(time.time()).encode('ascii'))
return md5_hash.hexdigest()
def which(cmd: str) -> str:
''' Analogue to UNIX which '''
return shutil.which(cmd)
def get_mime(ext) -> str:
''' ext must not start with dot '''
try:
mime_type, _ = mimetypes.guess_type('file.%s' % ext)
return mime_type
except:
return 'video/%s' % ext
def rm_glob(path_glob: str) -> None:
''' Delete files using glob (files only) '''
try:
files = glob.glob(path_glob)
for f in files:
try:
os.remove(f)
except:
pass
except:
pass