Files
tg-utility/hubot.py
2025-11-25 04:55:47 +03:00

242 lines
7.2 KiB
Python

''' HUman BOT '''
import asyncio
from telethon import TelegramClient, events
import html
import robot
import utils
# active sessions
sessions = {}
# available mods
_mods = {}
class AuthData:
def __init__(self, phone: str, account_name: str):
self.name = account_name
self.phone = phone
self.q = asyncio.Queue()
self.waiting = False
async def get_phone(self) -> str:
''' Workaround.
It is required, because TelegramClient will treat get_password and get_code
as coroutines only if coroutine is passed for phone number.
'''
return self.phone
async def __get_data(self, data_name: str) -> str | None:
''' Get data using Telegram management bot '''
# already waiting
if self.waiting:
await asyncio.sleep(1)
return None
# waiting now
self.waiting = True
# request
try:
# log
print('[I] Requesting %s for account \'%s\'...' % (data_name, self.name))
# send message to admin
text = '<b>Session %s requires %s!</b>' % (html.escape(self.name), data_name)
text += '\nSend it like this:'
text += '\n\n<code>/auth %s [%s HERE]</code>' % (html.escape(self.name), data_name.upper())
text += '\n\nBe aware that Telegram has protection' \
+ ' which prohibits sharing your credentials via Telegram.' \
+ ' You have to put underscore as second symbol in order' \
+ ' to evade this protection. For example, <code>12345</code>' \
+ ' becomes <code>1_2345</code>.'
await robot.send_to_admin(text, parse_mode='HTML')
# wait for data
data = await self.q.get()
# remove second symbol
data = data.strip()
data = data[0] + data[2:]
self.waiting = False
return data
except:
self.waiting = False
return None
async def get_password(self) -> str | None:
''' Get password using Telegram bot '''
return await self.__get_data('password')
async def get_code(self):
''' Get code using Telegram bot '''
return await self.__get_data('OTP')
async def provide_data(self, data: str) -> bool:
''' Provide data from other tasks '''
if not self.waiting:
return False
self.waiting = False
await self.q.put(data)
return True
async def _cb_new_message(event) -> None:
''' Handle new message '''
# get the client
client = event.client
name = client.session_name
# get message text
msg_text = None
msg_word = None
try:
m = event.message
msg_text = m.message
msg_word = msg_text.split(' ')[0].lower()
except:
pass
# tasks
tasks = []
# pass the event to all mods interested in it
for mod_name in _mods:
# mod is not enabled for this user?
if mod_name not in sessions[name]['config']['accounts'][name]:
continue
if not sessions[name]['config']['accounts'][name][mod_name]:
continue
# mod
mod = _mods[mod_name]
try:
# the mod matches the message?
if mod.mod_get_mighty() or msg_word in mod.mod_get_tags():
tasks.append(
asyncio.create_task(
mod.mod_new_message(sessions[name], event)
)
)
except:
utils.pex()
# wait for all tasks to complete
while tasks:
done, pending = await asyncio.wait(tasks)
tasks = pending
async def _session_task(session: dict) -> None:
''' Task that manages the session '''
# log
print('[I] Started session \'%s\'' % session['name'])
# client
client = session['client']
# tasks to monitor
to_monitor = {}
to_monitor['stop'] = asyncio.create_task(session['stop_event'].wait())
to_monitor['bot_start'] = asyncio.create_task(
client.start(
session['auth_data'].get_phone,
password=session['auth_data'].get_password,
code_callback=session['auth_data'].get_code,
)
)
# whether to continue work
to_work = True
# session loop
while to_work:
# id is unknown and we are authorized
if 'bot_start' not in to_monitor and 'bot_get_me' not in to_monitor and session['uid'] is None:
to_monitor['bot_get_me'] = asyncio.create_task(
client.get_me()
)
# what to wait for
aws = [to_monitor[k] for k in to_monitor]
# wait
done, pending = await asyncio.wait(aws, return_when=asyncio.FIRST_COMPLETED)
# stop
if to_monitor['stop'] in done:
to_work = False
# get_me
if 'bot_get_me' in to_monitor and to_monitor['bot_get_me'] in done:
session['uid'] = to_monitor['bot_get_me'].result().id
# delete completed tasks
for k in dict(to_monitor):
if to_monitor[k] in done:
del to_monitor[k]
# log
print('[I] Stopping session \'%s\'' % session['name'])
# cancel remaining tasks
for k in dict(to_monitor):
try:
to_monitor[k].cancel()
except:
pass
del to_monitor[k]
async def add_session(name: str, config: dict) -> bool:
''' Start Telegram user session '''
global sessions
# sessions already exists
if name in sessions:
return False
# login
login = config['accounts'][name]['login']
# directory 'sessions' does not exist
if not utils.ensure_dir('sessions'):
print('[!] Can\'t ensure \'sessions\' directory existance!')
return False
# client
client = TelegramClient(
'sessions/%s' % name,
api_id = config['tg_api_id'],
api_hash = config['tg_api_hash'],
connection_retries=None,
auto_reconnect=True
)
client.session_name = name
client.add_event_handler(_cb_new_message, events.NewMessage)
# session info
session = {
'uid': None,
'config': config,
'name': name,
'login': login,
'client': client,
'stop_event': asyncio.Event(),
'queue': asyncio.Queue(),
'auth_data': AuthData(login, name),
'task': None
}
# start the task
session['task'] = asyncio.create_task(_session_task(session))
sessions[name] = session
# quit
return True
async def stop_all() -> None:
''' Stop all sessions '''
global sessions
# stop all
for k in sessions:
s = sessions[k]
s['stop_event'].set()
await s['task']
# cleanup
sessions = []
async def provide_auth_data(account_name: str, data: str) -> bool:
''' Provide authentication data for account '''
# no such account
if account_name not in sessions:
return False
# provide
return await sessions[account_name]['auth_data'].provide_data(data)
def set_mods(mods: dict) -> None:
''' Set dict of available mods. '''
global _mods
_mods = dict(mods)
def unset_mods() -> None:
''' Remove available mods. '''
global _mods
_mods = {}
def get_all_sessions() -> dict:
return sessions