''' mod_video_downloader for tg-utility ''' import json import time import select import shutil import asyncio import traceback import subprocess import os from telethon import events from telethon.tl.types import PeerUser from telethon.tl.types import DocumentAttributeVideo from telethon.tl.types import DocumentAttributeAudio import utils # application config _config = None # cached video qualities _cached_qualities = { '03dcdde34c18d6128cf906a10365f014': { 'url': 'https://www.pornhub.com/view_video.php?viewkey=65edb3a6aede0', 'format': '240p', 'ext': 'mp4' } } def _get_all_qualities_raw(url: str, proxy: bool, timeout: float = 20) -> dict | str: ''' Get all video qualities as dict ''' try: # prepare arguments args = [ utils.which('yt-dlp'), '-J', url ] if proxy: args = [utils.which('proxychains4')] + args # start the process cp = subprocess.run( args, capture_output=True, timeout=timeout ) # check the result txt = cp.stdout.decode(encoding='utf-8') try: j = json.loads(txt) except: return cp.stderr.decode(encoding='ascii') if not j: return 'yt-dlp has failed to get the list of qualities' # dict to return res = {} title = j['title'] if 'title' in j else 'Без названия' author = j['channel'] if 'channel' in j else 'Неизвестен' thumbnail = j['thumbnail'] if 'thumbnail' in j else None # one format only - convert to many if 'formats' not in j: j['formats'] = [{ 'format': j['format'], 'format_id': j['format_id'], 'ext': j['ext'], 'video_ext': j['video_ext'], 'audio_ext': j['audio_ext'] }] if 'filesize' in j: j['formats'][0]['filesize'] = j['filesize'] # check all formats for f in j['formats']: obj = { 'title': title, 'author': author, 'url': url, 'format': f['format_id'], 'thumbnail': thumbnail, 'ext': f['ext'], 'format_name': f['format'], 'is_audio': f['audio_ext'] != 'none', '_added': time.time() } # filesize exists try: if 'filesize' in f: obj['filesize'] = int(f['filesize']) except: pass cache_id = utils.get_md5('\n'.join([str(obj[i]) for i in obj if i[0] != '_'])) _cached_qualities[cache_id] = obj res[cache_id] = obj return res except subprocess.TimeoutExpired: return 'Request timed out' except: return traceback.format_exc() async def _get_all_qualities(url: str, proxy: bool, timeout: float = 20) -> dict | str: ''' Async version ''' try: return await asyncio.to_thread(_get_all_qualities_raw, url, proxy, timeout) except: return traceback.format_exc() def _download_video_raw(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool: ''' Download video from URL, use quality code to get info from cache ''' try: # no such quality cached if quality_code not in _cached_qualities: return 'Not found quality for specified CODE!' # get data data = _cached_qualities[quality_code] # prepare arguments args = [ utils.which('yt-dlp'), '--newline', '-f', data['format'], '-o', path, url ] if proxy: args = [utils.which('proxychains4')] + args # start the process cp = subprocess.Popen( args, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, universal_newlines=True ) # check output every second last_update_time = time.time() last_progress = '' while True: lines = [] ready = True while ready: # check if terminated if cp.poll() is not None: # success if os.path.isfile(path): return True # failure else: return 'youtube-dl has exited but output file does not exist' break ready, _, _ = select.select([cp.stdout], [], [], 0.1) # data available if ready: l = cp.stdout.readline().strip() # check if line starts with [download] if not l.startswith('[download] '): continue # split parts = [i for i in l.replace('[download]', '').split(' ') if i] # no percent if '%' not in parts[0]: continue # yes percent - that's progress new_progress = parts[0] # change if new_progress != last_progress: last_progress = new_progress last_update_time = time.time() time.sleep(1) # timed out if time.time() - last_update_time >= timeout: return "Timed out" except: return traceback.format_exc() async def _download_video(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool: ''' Async version ''' try: return await asyncio.to_thread(_download_video_raw, url, quality_code, path, proxy, timeout) except: return traceback.format_exc() def _get_video_data_raw(path: str, audio: bool) -> dict | None: ''' Get video duration, width, height and file size ''' try: # prepare arguments args = [ utils.which('ffprobe'), '-v', 'error', '-select_streams', 'a:0' if audio else 'v:0', '-show_entries', 'stream=duration' if audio else 'stream=width,height,duration', '-show_entries', 'format=size', '-of', 'default=noprint_wrappers=1:nokey=1', path ] # start the process cp = subprocess.run( args, capture_output=True, timeout=10 ) # check the result txt = None try: txt = cp.stdout.decode(encoding='ascii').strip() except: txt = cp.stderr.decode(encoding='ascii').strip() return txt parts = [i.strip() for i in txt.split('\n') if i.strip()] # result for audio if audio: return { 'duration': int(float(parts[0])), 'size': int(parts[1]) } # result for video else: return { 'width': int(parts[0]), 'height': int(parts[1]), 'duration': int(float(parts[2])), 'size': int(parts[3]) } except subprocess.TimeoutExpired: return 'ffprobe timed out' except: return traceback.format_exc() async def _get_video_data(path: str, audio: bool) -> dict | None: ''' Async version ''' try: return await asyncio.to_thread(_get_video_data_raw, path, audio) except: return traceback.format_exc() def _generate_thumb_raw(video: str, timestamp: int, thumb: str) -> bool: ''' Generates a thumbnail for Telegram ''' try: # prepare arguments args = [ utils.which('ffmpeg'), '-ss', str(timestamp), '-i', video, '-vf', 'thumbnail,scale=\'min(280,iw)\':\'min(280,ih)\':force_original_aspect_ratio=decrease', '-frames:v', '1', thumb ] # start the process cp = subprocess.run( args, capture_output=True, timeout=10 ) # check the result txt = None try: txt = cp.stdout.decode(encoding='ascii').strip() txt = cp.stderr.decode(encoding='ascii').strip() except: pass parts = [i.strip() for i in txt.split('\n') if i.strip()] # result return os.path.isfile(thumb) except subprocess.TimeoutExpired: return 'ffprobe timed out' except: return traceback.format_exc() return False async def _generate_thumb(video: str, timestamp: int, thumb: str) -> dict | None: ''' Async version ''' try: return await asyncio.to_thread(_generate_thumb_raw, video, timestamp, thumb) except: return traceback.format_exc() def _download_thumb_for_tg_raw(url: str, proxy: bool, thumb: str) -> str | None: ''' Downloads a thumbnail for Telegram ''' try: # prepare arguments args = [ utils.which('ffmpeg'), '-i', url, '-vf', 'thumbnail,scale=\'min(1000,iw)\':\'min(1000,ih)\':force_original_aspect_ratio=decrease', thumb ] if proxy: args = [utils.which('proxychains4')] + args # start the process cp = subprocess.run( args, capture_output=True, timeout=10 ) # check the result txt = None try: txt = cp.stdout.decode(encoding='ascii').strip() txt = cp.stderr.decode(encoding='ascii').strip() except: pass parts = [i.strip() for i in txt.split('\n') if i.strip()] # result if not os.path.isfile(thumb): print('File does not exist somewhy') return None except subprocess.TimeoutExpired: return 'ffprobe timed out' except: return traceback.format_exc() return 'Wrong' async def _download_thumb_for_tg(url: str, proxy: bool, thumb: str) -> str | None: ''' Async version ''' try: return await asyncio.to_thread(_download_thumb_for_tg_raw, url, proxy, thumb) except: return traceback.format_exc() def _execute_args_check_file_raw(args: list[str], file_to_check: str) -> None | str: ''' Downloads a thumbnail for Telegram ''' try: # start the process cp = subprocess.run( args, capture_output=True, timeout=30 ) # file exists? if os.path.isfile(file_to_check): return None # check the result txt = None try: txt = cp.stderr.decode(encoding='ascii').strip() return txt except: pass except subprocess.TimeoutExpired: return 'Command has timed out' except: return traceback.format_exc() return 'Failed to execute command' async def _execute_args_check_file(args: list[str], file_to_check: str) -> str | None: ''' Async version ''' try: return await asyncio.to_thread(_execute_args_check_file_raw, args, file_to_check) except: return traceback.format_exc() async def mod_init(config: dict) -> bool: ''' Initialize the mod ''' global _config _config = config # delete old temp try: shutil.rmtree('mvd_temp') pass except: pass utils.ensure_dir('mvd_temp') print('[I] mod_video_downloader is initialized') async def mod_deinit() -> None: ''' Deinitialize the mod ''' # delete temp try: shutil.rmtree('mvd_temp') pass except: pass print('[I] mod_video_downloader is deinitialized') def mod_get_mighty() -> bool: ''' Mod is called 'mighty' if it receives all messages ''' return False def mod_get_tags() -> None: ''' Get tags used by the mod ''' return ['md', 'mvdl', 'mvdlp', 'madl', 'madlp', 'mdd', 'mddp'] async def mod_new_message(session, event) -> None: ''' Handle new message ''' try: # get the message msg = event.message # not outgoing - do not process #if not msg.out: # return # peer must be user peer = msg.peer_id if type(peer) is not PeerUser: return # get the text text = msg.message # get args args = [i for i in text.split(' ') if i] cmd = args[0].lower() args = args[1:] await asyncio.sleep(0.5) # help if cmd == 'md': response_text = 'mod_audio_video_downloader:' response_text += '\n- mvdl[p] [URL] - get list of all video qualities' response_text += '\n- madl[p] [URL] - get list of all audio qualities' response_text += '\n- mdd[p] [CODE] [TITLE] [§ PERFORMER] [§ ALBUM ART URL] - download video or audio (for audio only: track title)' response_text += '\n\nUse \'p\' letter to utilize proxy' await event.reply(message=response_text) # list video qualities elif cmd.startswith('mvdl'): if not args: await event.reply(message='No URL!') return await event.reply(message='Checking URL... Please wait, you\'ll be notified if an error happens!') qualities = await _get_all_qualities(' '.join(args), cmd[-1] == 'p') # error if type(qualities) is str: await event.reply(message='Error:\n\n%s' % qualities) return # success result = 'Qualities:' for qid in qualities: data = qualities[qid] # not a video if data['is_audio']: continue # extensions to ignore if data['ext'] in ['webm', 'mhtml']: continue result += '\n\nmdd%s %s' % ('p' if cmd[-1] == 'p' else '', qid) result += '\n- Format name: %s' % data['format_name'] result += '\n- Extension: %s' % data['ext'] if 'filesize' in data: result += '\n- Filesize: %s KB' % (data['filesize'] / 1000) await event.reply(message=result, parse_mode='HTML') # list audio qualities elif cmd.startswith('madl'): if not args: await event.reply(message='No URL!') return await event.reply(message='Checking URL... Please wait, you\'ll be notified if an error happens!') qualities = await _get_all_qualities(' '.join(args), cmd[-1] == 'p') # error if type(qualities) is str: await event.reply(message='Error:\n\n%s' % qualities) return # success result = 'Qualities:' for qid in qualities: data = qualities[qid] # not an audio if not data['is_audio']: continue # extensions to ignore if data['ext'] in ['webm']: continue result += '\n\nmdd%s %s' % ('p' if cmd[-1] == 'p' else '', qid) result += '\n- Format name: %s' % data['format_name'] result += '\n- Extension: %s' % data['ext'] if 'filesize' in data: result += '\n- Filesize: %s KB' % (data['filesize'] / 1000) await event.reply(message=result, parse_mode='HTML') # download elif cmd.startswith('mdd'): if not args: await event.reply(message='No CODE!') return # get the code and check it code = args[0] if code not in _cached_qualities: await event.reply(message='This code does not exist. Use \'mvdl[p]\' to obtain the code.') return # get video data data = _cached_qualities[code] await event.reply(message='Downloading the media... Please wait, you\'ll be notified if an error happens!') res = await _download_video(data['url'], code, 'mvd_temp/%s.bin' % code, cmd[-1] == 'p') # res is str - error if type(res) is str: utils.rm_glob('mvd_temp/%s.*' % code) await event.reply(message='Failed to download media: %s' % res) return # res is false if not res: utils.rm_glob('mvd_temp/%s.*' % code) await event.reply(message='Something went wrong during downloading...' % res) return # old name and new name old_name = 'mvd_temp/%s.bin' % code new_name = 'mvd_temp/%s.%s' % (code, data['ext']) # rename try: shutil.move(old_name, new_name) pass except: utils.rm_glob('mvd_temp/%s.*' % code) await event.reply(message='Failed to rename downloaded media') return # get video data video_data = await _get_video_data(new_name, data['is_audio']) if type(video_data) is not dict: utils.rm_glob('mvd_temp/%s.*' % code) await event.reply(message='Failed to use \'ffprobe\' to get media data:\n\n%s' % video_data) return # audio if data['is_audio']: # assign track title remains = [i.strip() for i in ' '.join(args[1:]).split('§') if i.strip()] title = data['title'] author = 'tg-utility & %s' % data['author'] thumbnail = data['thumbnail'] # use title and author from user message if remains: title = remains[0] if len(remains) >= 2: author = remains[1] if len(remains) >= 3: thumbnail = remains[2] # thumbnail exists, download it and change scale if thumbnail: thumb_path = 'mvd_temp/%s.jpg' % code thumbnail = await _download_thumb_for_tg(thumbnail, cmd[-1] == 'p', thumb_path) if thumbnail: await event.reply(message='WARNING (audio is still being processed - THIS IS NOT AN ERROR). Failed to download thumbnail.\n\n%s' % thumbnail) thumbnail = None else: thumbnail = thumb_path # thumbnail really exists, apply it to track if thumbnail: new_file_args = None very_new_file = None if data['ext'] in ('m4a', 'aac'): very_new_file = 'mvd_temp/%s.covered.%s' % (code, data['ext']) new_file_args = [ utils.which('ffmpeg'), '-i', new_name, '-i', thumbnail, '-map', '0', '-map', '1', '-c', 'copy', '-disposition:v', 'attached_pic', very_new_file ] # CLI arguments are present if new_file_args: # try to execute res = await _execute_args_check_file(new_file_args, very_new_file) # check if success if not res: new_name = very_new_file # failure else: await event.reply(message='Couldn\'t set album art cover. Won\'t be beautiful, but will still play.\n\n%s' % res) # send file await event.reply(message='Audio is downloaded, uploading it to Telegram...') try: await event.client.send_file( entity=peer, file=new_name, caption='This track is downloaded using tg-utility\n%s' % data['url'], mime_type=utils.get_mime(data['ext']), file_size=video_data['size'], thumb=thumbnail, attributes=[DocumentAttributeAudio( duration=video_data['duration'], title=title, performer=author )] ) except: await event.reply(message='Failed to upload audio to telegram!\n\n%s' % traceback.format_exc()) # video else: # generate the thumbnail thumb_name = 'mvd_temp/%s.jpg' % code if not await _generate_thumb(new_name, int(video_data['duration'] * 0.75), thumb_name): utils.rm_glob('mvd_temp/%s.*' % code) await event.reply(message='Failed to generate media thumbnail') return # log await event.reply(message='Media is downloaded, thumbnail is generated, uploading it to Telegram...') # send file try: await event.client.send_file( entity=peer, file=new_name, caption='%s' % data['url'], mime_type=utils.get_mime(data['ext']), file_size=video_data['size'], thumb=thumb_name, supports_streaming=True, attributes=[DocumentAttributeVideo( duration=video_data['duration'], w=video_data['width'], h=video_data['height'], supports_streaming=True )] ) except: await event.reply(message='Failed to upload media to telegram!\n\n%s' % traceback.format_exc()) utils.rm_glob('mvd_temp/%s.*' % code) except: utils.pex()