''' Chat sanitizer deletes messages with blacklisted words (unless they have whitelisted words) ''' import json import time import asyncio import traceback from telethon import events from telethon.tl.types import PeerUser from telethon.errors import FloodWaitError, ForbiddenError, BadRequestError import hubot import robot import utils # 30 minutes FLOOD_SLEEP_TIME = 60 * 30 VIOLENT_PACK_SIZE = 100 _config = None _mod_config = {} _workers = {} async def bad_funct() -> None: return 5 / 0 async def _violent_worker(data: dict) -> None: ''' Coroutine for violent sanitization ''' session = data['session'] client = session['client'] # task to wait for stop event stop_t = asyncio.create_task(data['stop_event'].wait()) # continue operation? to_work = True # target peer target_peer = utils.id_to_peer(data['target_chat_id']) # report-to peer report_peer = utils.id_to_peer(data['report_chat_id']) # current messages offset messages_offset = data['start_from_id'] - VIOLENT_PACK_SIZE blacklist = list(_mod_config[session['name']]['chats'][data['target_chat_id']]['blacklist']) whitelist = list(_mod_config[session['name']]['chats'][data['target_chat_id']]['whitelist']) notify_in = 10000 # work while allowed to while to_work: messages_offset += VIOLENT_PACK_SIZE # time to sleep after everything time_to_sleep = 3 # message getting task msgs_t = asyncio.create_task(client.get_messages(target_peer, offset_id=messages_offset, limit=VIOLENT_PACK_SIZE, reverse=True)) # wait for tasks done, pending = await asyncio.wait([msgs_t, stop_t], return_when=asyncio.FIRST_COMPLETED) # to stop if stop_t in done: try: msgs_t.cancel() except: pass await client.send_message(report_peer, message='mod_chat_sanitizer:\n\nStopping by user request... Offset: %s' % messages_offset) to_work = False break if notify_in <= 0: notify_in = 10000 await client.send_message(report_peer, message='mod_chat_sanitizer:\n\nProcessed some messages. Current offset: %s' % (messages_offset)) # messages to delete messages_to_delete = [] # messages checker try: do_not_continue = False try: e = msgs_t.exception() if e is not None: et = type(e) if et is FloodWaitError: await client.send_message(report_peer, message='mod_chat_sanitizer:\n\nFloodWaitError for get_messages. Sleeping for %s seconds. Offset: %s' % (e.seconds, messages_offset)) time_to_sleep = e.seconds + 1 elif et is ForbiddenError: await client.send_message(report_peer, message='mod_chat_sanitizer:\n\nForbiddenError - probably permissions were revoked. Stopping... Offset: %s' % messages_offset) to_work = False break do_not_continue = True except: pass if do_not_continue: raise Exception('do not continue this check') # get the results res = msgs_t.result() # no more messages if len(res) == 0: await client.send_message(report_peer, message='mod_chat_sanitizer:\n\nLooks like no new messages. Stopping... Offset: %s' % messages_offset) to_work = False break for i in range(len(res)): try: # check if text is forbidden if res[i].text: if is_text_forbidden(res[i].message.lower(), blacklist, whitelist): messages_to_delete.append(res[i].id) continue if is_text_whitelisted(res[i].message.lower(), whitelist): continue # names are not to be checked if not data['check_names']: continue # check if username is forbidden try: sender = await res[i].get_sender() name = sender.first_name if name: if sender.last_name is not None: name += ' ' + sender.last_name if is_text_whitelisted(name.lower(), whitelist): continue if is_text_forbidden(name.lower(), blacklist, whitelist): messages_to_delete.append(res[i].id) continue except: pass # check if forwared from forbidden person try: fwd = res[i].forward if fwd is not None: ofwd = fwd.original_fwd name_to_check = None # privacy settings? if ofwd.from_name is not None or ofwd.saved_from_name is not None: name_to_check = ofwd.from_name if ofwd.from_name is not None else ofwd.saved_from_name else: sender = await fwd.get_sender() if sender is None: continue name_to_check = sender.first_name if name_to_check: if sender.last_name is not None: name_to_check += ' ' + sender.last_name if name_to_check is not None and is_text_whitelisted(name.lower(), whitelist): continue if name_to_check is not None and is_text_forbidden(name_to_check.lower(), blacklist, whitelist): messages_to_delete.append(res[i].id) continue except: utils.pex() except: utils.pex() pass except: utils.pex() # delete the messages try: if messages_to_delete: await client.delete_messages(target_peer, messages_to_delete, revoke=True) _mod_config[session['name']]['chats'][data['target_chat_id']]['deleted_count'] += len(messages_to_delete) except FloodWaitError as e: await client.send_message(report_peer, message='mod_chat_sanitizer:\n\nFloodWaitError for get_messages. Sleeping for %s seconds. Offset: %s' % (e.seconds, messages_offset)) time_to_sleep = e.seconds + 1 except ForbiddenError: await client.send_message(report_peer, message='mod_chat_sanitizer:\n\nForbiddenError - probably permissions were revoked. Stopping... Offset: %s' % messages_offset) to_work = False break except: utils.pex() # wait for timer t_t = asyncio.create_task(asyncio.sleep(time_to_sleep)) done, pending = await asyncio.wait([t_t, stop_t], return_when=asyncio.FIRST_COMPLETED) if stop_t in done: try: t_t.cancel() except: pass await client.send_message(report_peer, message='mod_chat_sanitizer:\n\nStopping by user request... Offset: %s' % messages_offset) to_work = False break def mod_load_config(path: str = 'modstorage_mcs/config.json') -> bool: ''' Load mod config from file ''' global _mod_config try: j = None with open(path, 'r') as f: j = json.loads(f.read()) # failure? if j is None: return False # just save _mod_config = j return True except: return False def mod_save_config(path: str = 'modstorage_mcs/config.json') -> bool: ''' Save mod config to file ''' try: with open(path, 'w') as f: f.write(json.dumps(_mod_config, indent=4)) return True except: return False def mod_get_config_for_session(session_name) -> dict: ''' Get config for session ''' # check if config for the session exists if session_name in _mod_config: return _mod_config[session_name] # does not exist - generate one c = { 'chats': {}, 'unlock_timestamp': 0.0 } _mod_config[session_name] = c return c def is_text_whitelisted(text, whitelist) -> bool: ''' Returns True if text contains something from whitelist ''' # check if has whitelisted content for word in whitelist: # whitelisted content exists if word in text.lower(): return True # forbidden return False def is_text_forbidden(text, blacklist, whitelist) -> bool: ''' Returns True or False ''' # check if has blacklisted content for word in blacklist: if word in text.lower(): break else: return False # check if has whitelisted content for word in whitelist: # whitelisted content exists if word in text.lower(): return False # forbidden return True def create_violent_worker(session, target_chat_id: int, report_chat_id: int, start_from_id: int, check_names: bool = True) -> bool: ''' Creates agressive message sanitizer. Returns True on success. Does not succeed if chat is already being sanitized. ''' # add to workers if not added yet if session['name'] not in _workers: _workers[session['name']] = {} # db session_workers = _workers[session['name']] # chat is already being sanitized if target_chat_id in session_workers: # check if active if not session_workers[target_chat_id]['task'].done(): return False # create worker for chat worker = { 'session': session, 'target_chat_id': target_chat_id, 'report_chat_id': report_chat_id, 'start_from_id': start_from_id, 'check_names': check_names, 'stop_event': asyncio.Event(), 'task': None } # create the task worker['task'] = asyncio.create_task(_violent_worker(worker)) # save session_workers[target_chat_id] = worker return True def is_worker_active(session, target_chat_id: int) -> bool: ''' Returns True if chat is being sanitized ''' if session['name'] not in _workers: return False if target_chat_id not in _workers[session['name']]: return False d = _workers[session['name']][target_chat_id] return not d['task'].done() async def stop_violent_worker(worker: dict) -> None: ''' Stop the worker right now. ''' # already stopped try: if worker['task'].done(): return # stop try: worker['stop_event'].set() await worker['task'] except: utils.pex() except: utils.pex() async def stop_all_violent_workers(session_name: str | None = None) -> None: ''' Stop all workers. ''' workers_to_stop = [] # session name is not set - stop all if not session_name: for session in _workers: workers = _workers[session] for wname in workers: workers_to_stop.append(workers[wname]) # session name is set else: if session_name not in _workers: return for wname in _workers[session_name]: workers_to_stop.append(_workers[session_name][wname]) # stop all for w in workers_to_stop: await stop_violent_worker(w) async def msg_outgoing(session, event) -> None: ''' Handle outgoing message ''' global _mod_config # get the message and session config msg = event.message cfg = mod_get_config_for_session(session['name']) # get the text text = None try: text = msg.message if not text: return except: return # get args args = [i for i in text.split(' ') if i] # cmd and args cmd = args[0].lower() args = args[1:] # help if cmd == 'mcs': if args: return response_text = 'mod_chat_sanitizer commands:\n' response_text += '\n⚪️ mcs - get this help message' response_text += '\n⚪️ mcs_rst - reset mod\'s storage. Deletes settings for ALL added accounts.' response_text += '\n⚪️ mcs_list - list all chats added to sanitization mod' response_text += '\n⚪️ mcs_add - add chat to sanitization mod' response_text += '\n⚪️ mcs_del - delete chat from sanitization mod' response_text += '\n⚪️ mcs_off - turn sanitization OFF for chat (or all chats)' response_text += '\n⚪️ mcs_on - turn sanitization ON for chat (or all chats)' response_text += '\n⚪️ mcs_ultraviolence [START_FROM_ID] - perform complete sanitization of chat (reports will be sent to chat, where the command was executed)' response_text += '\n⚪️ mcs_stopit - stop ultraviolence' response_text += '\n⚪️ mcs_bl [new blacklist, one word per line] - set new blacklist for chat (or show current blacklist)' response_text += '\n⚪️ mcs_wl [new whilelist, one word per line] - like blacklist, but whitelist' response_text += '\n\nIf chat is added and enabled, then new messages will be sanitized.' response_text += ' If \'ultraviolence\' is induced, the entire chat history is sanitized, starting from START_FROM_ID (or 0, if not specified).' response_text += ' The message will be deleted if it contains substrings from blacklist AND it does NOT contain substrings from whitelist.' await event.reply(message=response_text) # config reset if cmd == 'mcs_rst': _mod_config = {} await mod_save_config() await event.reply(message='Configuration for mod_chat_sanitizer is reset') return # list all chats if cmd == 'mcs_list': response = 'Chat list (IDs):' if cfg['chats']: for id in cfg['chats']: chat = cfg['chats'][id] response += '\n\n- Chat %s' % id response += '\nEnabled: %s' % chat['state'] response += '\nDeleted (all time): %s' % chat['deleted_count'] response += '\nLast deleted ID: %s' % chat['last_deleted_id'] response += '\nBlacklist size: %s' % len(chat['blacklist']) response += '\nWhitelist size: %s' % len(chat['whitelist']) else: response += '\n no chats added' await event.reply(message=response) return # add chat if cmd == 'mcs_add': chat_id = None try: chat_id = str(args[0]) except: await event.reply(message='mcs_add [CHAT_ID]') return if chat_id in cfg['chats']: await event.reply(message='Chat #%s is already added' % chat_id) return # add the chat cfg['chats'][chat_id] = { 'state': False, 'deleted_count': 0, 'last_deleted_id': None, 'blacklist': [], 'whitelist': [] } # save the config mod_save_config() await event.reply(message='Chat #%s is added' % chat_id) return # delete chat if cmd == 'mcs_del': chat_id = None try: chat_id = str(args[0]) except: await event.reply(message='mcs_del [CHAT_ID]') return if chat_id not in cfg['chats']: await event.reply(message='Chat #%s is not added, so not deleted' % chat_id) return if is_worker_active(session, chat_id): await event.reply(message='Chat #%s is being violently sanitized, so not deleted' % chat_id) return # remove the chat del cfg['chats'][chat_id] # save the config mod_save_config() await event.reply(message='Chat #%s is deleted' % chat_id) return # disable chat if cmd == 'mcs_off': chat_id = None try: chat_id = str(args[0]) except: await event.reply(message='mcs_off [CHAT_ID]') return if chat_id not in cfg['chats']: await event.reply(message='Chat #%s is not added' % chat_id) return if not cfg['chats'][chat_id]['state']: await event.reply(message='Chat #%s is already disabled' % chat_id) return cfg['chats'][chat_id]['state'] = False mod_save_config() await event.reply(message='Chat #%s is disabled (not sanitized anymore)' % chat_id) return # enable chat if cmd == 'mcs_on': chat_id = None try: chat_id = str(args[0]) except: await event.reply(message='mcs_on [CHAT_ID]') return if chat_id not in cfg['chats']: await event.reply(message='Chat #%s is not added' % chat_id) return if cfg['chats'][chat_id]['state']: await event.reply(message='Chat #%s is already enabled' % chat_id) return cfg['chats'][chat_id]['state'] = True mod_save_config() await event.reply(message='Chat #%s is enabled (sanitized from now on)' % chat_id) return # agressive sanitization if cmd == 'mcs_ultraviolence': chat_id = None start_from = 0 try: chat_id = str(args[0]) except: await event.reply(message='mcs_ultraviolence [START_FROM_ID]') return # check if start_from is specified if len(args) >= 2: try: start_from = int(args[1]) if start_from < 0: raise Exception('<0') except: await event.reply(message='START_FROM_ID must be >= 0') return if chat_id not in cfg['chats']: await event.reply(message='Chat %s is not added' % chat_id) return if is_worker_active(session, chat_id): await event.reply(message='Chat %s is already being violently sanitized, so not starting again' % chat_id) return if not cfg['chats'][chat_id]['blacklist']: await event.reply(message='Chat %s has empty blacklist - refusing to start' % chat_id) return # start if create_violent_worker(session, chat_id, utils.peer_to_id(msg.peer_id), start_from): await event.reply(message='Started violent sanitization of chat %s' % chat_id) else: await event.reply(message='Failed to start violent sanitization of chat %s' % chat_id) return # stop sanitization if cmd == 'mcs_stopit': chat_id = None try: if args[0].lower() == 'all': chat_id = 'all' else: chat_id = str(args[0]) except: await event.reply(message='mcs_stopit ') return # all if chat_id == 'all': await event.reply(message='Stopping violent sanitization of all chats...') await stop_all_violent_workers(session['name']) await event.reply(message='Violent sanitization of all chats is stopped') return # stop only one if not is_worker_active(session, chat_id): await event.reply(message='Chat %s is not being violently sanitized, so not stopping (no need to stop - not doing anything)' % chat_id) return # get the worker w = _workers[session['name']][chat_id] await event.reply(message='Stopping violent sanitization of chat %s...' % chat_id) await stop_violent_worker(w) await event.reply(message='Violent sanitization of chat %s is stopped' % chat_id) return # modify blacklist if cmd == 'mcs_bl': # lines lines = [i.strip() for i in text.split('\n') if i.strip()] args = lines[0].split(' ') lines = lines[1:] cmd = args[0].lower() args = args[1:] # get the chat ID chat_id = None try: chat_id = str(args[0]) except: await event.reply(message='Syntax:\n\nmcs_bl \nsubstr1\nsubstr2\nsubstr3\n...\n\nYou may omit substrings to read current blacklist') return # no such chat if chat_id not in cfg['chats']: await event.reply(message='Chat #%s is not added' % chat_id) return # less than one line - log existsing if len(lines) < 1: response = 'Blacklist for chat #%s:\n' % chat_id if not cfg['chats'][chat_id]['blacklist']: response = 'Blacklist for chat #%s is empty' % chat_id else: for m in cfg['chats'][chat_id]['blacklist']: response += '\n' + m await event.reply(message=response) return # create new blacklist cfg['chats'][chat_id]['blacklist'] = [] for l in [i.lower() for i in lines]: if l in cfg['chats'][chat_id]['blacklist']: continue cfg['chats'][chat_id]['blacklist'].append(l) mod_save_config() # create response response = 'New blacklist content for chat %s:\n' % chat_id for l in cfg['chats'][chat_id]['blacklist']: response += '\n%s' % l await event.reply(message=response) return # modify whitelist if cmd == 'mcs_wl': # lines lines = [i.strip() for i in text.split('\n') if i.strip()] args = lines[0].split(' ') lines = lines[1:] cmd = args[0].lower() args = args[1:] # get the chat ID chat_id = None try: chat_id = str(args[0]) except: await event.reply(message='Syntax:\n\nmcs_wl \nsubstr1\nsubstr2\nsubstr3\n...\n\nYou may omit substrings to read current whitelist') return # no such chat if chat_id not in cfg['chats']: await event.reply(message='Chat #%s is not added' % chat_id) return # less than one line - log existsing if len(lines) < 1: response = 'Whitelist for chat #%s:\n' % chat_id if not cfg['chats'][chat_id]['whitelist']: response = 'Whitelist for chat #%s is empty' % chat_id else: for m in cfg['chats'][chat_id]['whitelist']: response += '\n' + m await event.reply(message=response) return # create new whitelist cfg['chats'][chat_id]['whitelist'] = [] for l in [i.lower() for i in lines]: if l in cfg['chats'][chat_id]['whitelist']: continue cfg['chats'][chat_id]['whitelist'].append(l) mod_save_config() # create response response = 'New whitelist content for chat %s:\n' % chat_id for l in cfg['chats'][chat_id]['whitelist']: response += '\n%s' % l await event.reply(message=response) return async def msg_incoming(session, event) -> None: ''' Handle outgoing message ''' global _mod_config cfg = mod_get_config_for_session(session['name']) # locked currently if time.time() < cfg['unlock_timestamp']: return # get the message and session config msg = event.message # get the text text = None try: text = msg.message if not text: return except: return # get chat id chat_id = utils.peer_to_id(msg.peer_id) # not added if chat_id not in cfg['chats']: return # disabled if not cfg['chats'][chat_id]['state']: return # text is not forbidden if not is_text_forbidden(text, cfg['chats'][chat_id]['blacklist'], cfg['chats'][chat_id]['whitelist']): return # delete the message try: await asyncio.sleep(5) # to evade bot detection # we may get locked during timeout above if time.time() < cfg['unlock_timestamp']: return await msg.delete() cfg['chats'][chat_id]['deleted_count'] += 1 except BadRequestError: pass except ForbiddenError as e: cfg['chats'][chat_id]['state'] = False await robot.send_to_admin('mod_chat_sanitizer ForbiddenError, disabling chat\n\nsession_name = %s\nchat_id = %s' % (session['name'], chat_id)) except FloodWaitError as e: cfg['unlock_timestamp'] = time.time() + e.seconds + 1 await robot.send_to_admin( 'mod_chat_sanitizer FloodWaitError:\n\nsession_name = %s\nchat_id = %s\nwait_for = %s' % ( \ session['name'], \ chat_id, \ e.seconds \ ) \ ) except: cfg['unlock_timestamp'] = time.time() + FLOOD_SLEEP_TIME await robot.send_to_admin( 'mod_chat_sanitizer exception:\n\nsession_name = %s\nchat_id = %s\n\n%s' % ( \ session['name'], \ chat_id, \ traceback.format_exc() \ ) \ ) async def mod_init(config: dict) -> bool: ''' Initialize the mod ''' global _config _config = config # create directories utils.ensure_dir('modstorage_mcs') # try to load config if not mod_load_config('modstorage_mcs/config.json'): print('[!] mod_chat_sanitizer has failed to load config, regenerating...') mod_save_config('modstorage_mcs/config.json') print('[I] mod_chat_sanitizer is initialized') async def mod_deinit() -> None: ''' Deinitialize the mod ''' await stop_all_violent_workers() mod_save_config('modstorage_mcs/config.json') print('[I] mod_chat_sanitizer is deinitialized') def mod_get_mighty() -> bool: ''' Mod is called 'mighty' if it receives all messages ''' return True def mod_get_tags() -> None: ''' Get tags used by the mod ''' return ['mcs'] async def mod_new_message(session, event) -> None: ''' Handle new message ''' # WARNING: this mod is defined as mighty, so the messages must be # really filtered well. try: # get the message msg = event.message # outgoing - handle if msg.out: await msg_outgoing(session, event) return # incoming - handle await msg_incoming(session, event) except: utils.pex()