- changed thumbnail size for audio to 1000x1000 at most - .m4a and .aac now get thumbnail integrated into them - ability to set custom thumbnail for audio - changed audio caption
624 lines
22 KiB
Python
624 lines
22 KiB
Python
''' mod_video_downloader for tg-utility '''
|
|
|
|
import json
|
|
import time
|
|
import select
|
|
import shutil
|
|
import asyncio
|
|
import traceback
|
|
import subprocess
|
|
import os
|
|
|
|
from telethon import events
|
|
from telethon.tl.types import PeerUser
|
|
from telethon.tl.types import DocumentAttributeVideo
|
|
from telethon.tl.types import DocumentAttributeAudio
|
|
|
|
import utils
|
|
|
|
# application config
|
|
_config = None
|
|
# cached video qualities
|
|
_cached_qualities = {
|
|
'03dcdde34c18d6128cf906a10365f014': {
|
|
'url': 'https://www.pornhub.com/view_video.php?viewkey=65edb3a6aede0',
|
|
'format': '240p',
|
|
'ext': 'mp4'
|
|
}
|
|
}
|
|
|
|
|
|
def _get_all_qualities_raw(url: str, proxy: bool, timeout: float = 20) -> dict | str:
|
|
''' Get all video qualities as dict '''
|
|
try:
|
|
# prepare arguments
|
|
args = [
|
|
utils.which('yt-dlp'),
|
|
'-J',
|
|
url
|
|
]
|
|
if proxy:
|
|
args = [utils.which('proxychains4')] + args
|
|
# start the process
|
|
cp = subprocess.run(
|
|
args,
|
|
capture_output=True,
|
|
timeout=timeout
|
|
)
|
|
# check the result
|
|
txt = cp.stdout.decode(encoding='utf-8')
|
|
try:
|
|
j = json.loads(txt)
|
|
except:
|
|
return cp.stderr.decode(encoding='ascii')
|
|
if not j:
|
|
return 'yt-dlp has failed to get the list of qualities'
|
|
|
|
# dict to return
|
|
res = {}
|
|
title = j['title'] if 'title' in j else 'Без названия'
|
|
author = j['channel'] if 'channel' in j else 'Неизвестен'
|
|
thumbnail = j['thumbnail'] if 'thumbnail' in j else None
|
|
# one format only - convert to many
|
|
if 'formats' not in j:
|
|
j['formats'] = [{
|
|
'format': j['format'],
|
|
'format_id': j['format_id'],
|
|
'ext': j['ext'],
|
|
'video_ext': j['video_ext'],
|
|
'audio_ext': j['audio_ext']
|
|
}]
|
|
if 'filesize' in j:
|
|
j['formats'][0]['filesize'] = j['filesize']
|
|
# check all formats
|
|
for f in j['formats']:
|
|
obj = {
|
|
'title': title,
|
|
'author': author,
|
|
'url': url,
|
|
'format': f['format_id'],
|
|
'thumbnail': thumbnail,
|
|
'ext': f['ext'],
|
|
'format_name': f['format'],
|
|
'is_audio': f['audio_ext'] != 'none',
|
|
'_added': time.time()
|
|
}
|
|
# filesize exists
|
|
try:
|
|
if 'filesize' in f:
|
|
obj['filesize'] = int(f['filesize'])
|
|
except:
|
|
pass
|
|
cache_id = utils.get_md5('\n'.join([str(obj[i]) for i in obj if i[0] != '_']))
|
|
_cached_qualities[cache_id] = obj
|
|
res[cache_id] = obj
|
|
return res
|
|
except subprocess.TimeoutExpired:
|
|
return 'Request timed out'
|
|
except:
|
|
return traceback.format_exc()
|
|
|
|
async def _get_all_qualities(url: str, proxy: bool, timeout: float = 20) -> dict | str:
|
|
''' Async version '''
|
|
try:
|
|
return await asyncio.to_thread(_get_all_qualities_raw, url, proxy, timeout)
|
|
except:
|
|
return traceback.format_exc()
|
|
|
|
def _download_video_raw(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool:
|
|
''' Download video from URL, use quality code to get info from cache '''
|
|
try:
|
|
# no such quality cached
|
|
if quality_code not in _cached_qualities:
|
|
return 'Not found quality for specified CODE!'
|
|
# get data
|
|
data = _cached_qualities[quality_code]
|
|
|
|
# prepare arguments
|
|
args = [
|
|
utils.which('yt-dlp'),
|
|
'--newline',
|
|
'-f',
|
|
data['format'],
|
|
'-o',
|
|
path,
|
|
url
|
|
]
|
|
if proxy:
|
|
args = [utils.which('proxychains4')] + args
|
|
# start the process
|
|
cp = subprocess.Popen(
|
|
args,
|
|
bufsize=1,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
universal_newlines=True
|
|
)
|
|
# check output every second
|
|
last_update_time = time.time()
|
|
last_progress = ''
|
|
while True:
|
|
lines = []
|
|
ready = True
|
|
while ready:
|
|
# check if terminated
|
|
if cp.poll() is not None:
|
|
# success
|
|
if os.path.isfile(path):
|
|
return True
|
|
# failure
|
|
else:
|
|
return 'youtube-dl has exited but output file does not exist'
|
|
break
|
|
ready, _, _ = select.select([cp.stdout], [], [], 0.1)
|
|
# data available
|
|
if ready:
|
|
l = cp.stdout.readline().strip()
|
|
# check if line starts with [download]
|
|
if not l.startswith('[download] '):
|
|
continue
|
|
# split
|
|
parts = [i for i in l.replace('[download]', '').split(' ') if i]
|
|
# no percent
|
|
if '%' not in parts[0]:
|
|
continue
|
|
# yes percent - that's progress
|
|
new_progress = parts[0]
|
|
# change
|
|
if new_progress != last_progress:
|
|
last_progress = new_progress
|
|
last_update_time = time.time()
|
|
time.sleep(1)
|
|
# timed out
|
|
if time.time() - last_update_time >= timeout:
|
|
return "Timed out"
|
|
except:
|
|
return traceback.format_exc()
|
|
|
|
async def _download_video(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool:
|
|
''' Async version '''
|
|
try:
|
|
return await asyncio.to_thread(_download_video_raw, url, quality_code, path, proxy, timeout)
|
|
except:
|
|
return traceback.format_exc()
|
|
|
|
def _get_video_data_raw(path: str, audio: bool) -> dict | None:
|
|
''' Get video duration, width, height and file size '''
|
|
try:
|
|
# prepare arguments
|
|
args = [
|
|
utils.which('ffprobe'),
|
|
'-v',
|
|
'error',
|
|
'-select_streams',
|
|
'a:0' if audio else 'v:0',
|
|
'-show_entries',
|
|
'stream=duration' if audio else 'stream=width,height,duration',
|
|
'-show_entries',
|
|
'format=size',
|
|
'-of',
|
|
'default=noprint_wrappers=1:nokey=1',
|
|
path
|
|
]
|
|
# start the process
|
|
cp = subprocess.run(
|
|
args,
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
# check the result
|
|
txt = None
|
|
try:
|
|
txt = cp.stdout.decode(encoding='ascii').strip()
|
|
except:
|
|
txt = cp.stderr.decode(encoding='ascii').strip()
|
|
return txt
|
|
parts = [i.strip() for i in txt.split('\n') if i.strip()]
|
|
# result for audio
|
|
if audio:
|
|
return {
|
|
'duration': int(float(parts[0])),
|
|
'size': int(parts[1])
|
|
}
|
|
# result for video
|
|
else:
|
|
return {
|
|
'width': int(parts[0]),
|
|
'height': int(parts[1]),
|
|
'duration': int(float(parts[2])),
|
|
'size': int(parts[3])
|
|
}
|
|
except subprocess.TimeoutExpired:
|
|
return 'ffprobe timed out'
|
|
except:
|
|
return traceback.format_exc()
|
|
|
|
async def _get_video_data(path: str, audio: bool) -> dict | None:
|
|
''' Async version '''
|
|
try:
|
|
return await asyncio.to_thread(_get_video_data_raw, path, audio)
|
|
except:
|
|
return traceback.format_exc()
|
|
|
|
def _generate_thumb_raw(video: str, timestamp: int, thumb: str) -> bool:
|
|
''' Generates a thumbnail for Telegram '''
|
|
try:
|
|
# prepare arguments
|
|
args = [
|
|
utils.which('ffmpeg'),
|
|
'-ss',
|
|
str(timestamp),
|
|
'-i',
|
|
video,
|
|
'-vf',
|
|
'thumbnail,scale=\'min(280,iw)\':\'min(280,ih)\':force_original_aspect_ratio=decrease',
|
|
'-frames:v',
|
|
'1',
|
|
thumb
|
|
]
|
|
# start the process
|
|
cp = subprocess.run(
|
|
args,
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
# check the result
|
|
txt = None
|
|
try:
|
|
txt = cp.stdout.decode(encoding='ascii').strip()
|
|
txt = cp.stderr.decode(encoding='ascii').strip()
|
|
except:
|
|
pass
|
|
parts = [i.strip() for i in txt.split('\n') if i.strip()]
|
|
# result
|
|
return os.path.isfile(thumb)
|
|
except subprocess.TimeoutExpired:
|
|
return 'ffprobe timed out'
|
|
except:
|
|
return traceback.format_exc()
|
|
return False
|
|
|
|
async def _generate_thumb(video: str, timestamp: int, thumb: str) -> dict | None:
|
|
''' Async version '''
|
|
try:
|
|
return await asyncio.to_thread(_generate_thumb_raw, video, timestamp, thumb)
|
|
except:
|
|
return traceback.format_exc()
|
|
|
|
def _download_thumb_for_tg_raw(url: str, proxy: bool, thumb: str) -> str | None:
|
|
''' Downloads a thumbnail for Telegram '''
|
|
try:
|
|
# prepare arguments
|
|
args = [
|
|
utils.which('ffmpeg'),
|
|
'-i',
|
|
url,
|
|
'-vf',
|
|
'thumbnail,scale=\'min(1000,iw)\':\'min(1000,ih)\':force_original_aspect_ratio=decrease',
|
|
thumb
|
|
]
|
|
if proxy:
|
|
args = [utils.which('proxychains4')] + args
|
|
# start the process
|
|
cp = subprocess.run(
|
|
args,
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
# check the result
|
|
txt = None
|
|
try:
|
|
txt = cp.stdout.decode(encoding='ascii').strip()
|
|
txt = cp.stderr.decode(encoding='ascii').strip()
|
|
except:
|
|
pass
|
|
parts = [i.strip() for i in txt.split('\n') if i.strip()]
|
|
# result
|
|
if not os.path.isfile(thumb):
|
|
print('File does not exist somewhy')
|
|
return None
|
|
except subprocess.TimeoutExpired:
|
|
return 'ffprobe timed out'
|
|
except:
|
|
return traceback.format_exc()
|
|
return 'Wrong'
|
|
|
|
async def _download_thumb_for_tg(url: str, proxy: bool, thumb: str) -> str | None:
|
|
''' Async version '''
|
|
try:
|
|
return await asyncio.to_thread(_download_thumb_for_tg_raw, url, proxy, thumb)
|
|
except:
|
|
return traceback.format_exc()
|
|
|
|
def _execute_args_check_file_raw(args: list[str], file_to_check: str) -> None | str:
|
|
''' Downloads a thumbnail for Telegram '''
|
|
try:
|
|
# start the process
|
|
cp = subprocess.run(
|
|
args,
|
|
capture_output=True,
|
|
timeout=30
|
|
)
|
|
# file exists?
|
|
if os.path.isfile(file_to_check):
|
|
return None
|
|
# check the result
|
|
txt = None
|
|
try:
|
|
txt = cp.stderr.decode(encoding='ascii').strip()
|
|
return txt
|
|
except:
|
|
pass
|
|
except subprocess.TimeoutExpired:
|
|
return 'Command has timed out'
|
|
except:
|
|
return traceback.format_exc()
|
|
return 'Failed to execute command'
|
|
|
|
async def _execute_args_check_file(args: list[str], file_to_check: str) -> str | None:
|
|
''' Async version '''
|
|
try:
|
|
return await asyncio.to_thread(_execute_args_check_file_raw, args, file_to_check)
|
|
except:
|
|
return traceback.format_exc()
|
|
|
|
async def mod_init(config: dict) -> bool:
|
|
''' Initialize the mod '''
|
|
global _config
|
|
_config = config
|
|
# delete old temp
|
|
try:
|
|
shutil.rmtree('mvd_temp')
|
|
pass
|
|
except:
|
|
pass
|
|
utils.ensure_dir('mvd_temp')
|
|
print('[I] mod_video_downloader is initialized')
|
|
|
|
async def mod_deinit() -> None:
|
|
''' Deinitialize the mod '''
|
|
# delete temp
|
|
try:
|
|
shutil.rmtree('mvd_temp')
|
|
pass
|
|
except:
|
|
pass
|
|
print('[I] mod_video_downloader is deinitialized')
|
|
|
|
def mod_get_mighty() -> bool:
|
|
''' Mod is called 'mighty' if it receives all messages '''
|
|
return False
|
|
|
|
def mod_get_tags() -> None:
|
|
''' Get tags used by the mod '''
|
|
return ['md', 'mvdl', 'mvdlp', 'madl', 'madlp', 'mdd', 'mddp']
|
|
|
|
async def mod_new_message(session, event) -> None:
|
|
''' Handle new message '''
|
|
try:
|
|
# get the message
|
|
msg = event.message
|
|
# not outgoing - do not process
|
|
#if not msg.out:
|
|
# return
|
|
# peer must be user
|
|
peer = msg.peer_id
|
|
if type(peer) is not PeerUser:
|
|
return
|
|
|
|
# get the text
|
|
text = msg.message
|
|
# get args
|
|
args = [i for i in text.split(' ') if i]
|
|
cmd = args[0].lower()
|
|
args = args[1:]
|
|
await asyncio.sleep(0.5)
|
|
# help
|
|
if cmd == 'md':
|
|
response_text = 'mod_audio_video_downloader:'
|
|
response_text += '\n- mvdl[p] [URL] - get list of all video qualities'
|
|
response_text += '\n- madl[p] [URL] - get list of all audio qualities'
|
|
response_text += '\n- mdd[p] [CODE] [TITLE] [§ PERFORMER] [§ ALBUM ART URL] - download video or audio (for audio only: track title)'
|
|
response_text += '\n\nUse \'p\' letter to utilize proxy'
|
|
await event.reply(message=response_text)
|
|
# list video qualities
|
|
elif cmd.startswith('mvdl'):
|
|
if not args:
|
|
await event.reply(message='No URL!')
|
|
return
|
|
await event.reply(message='Checking URL... Please wait, you\'ll be notified if an error happens!')
|
|
qualities = await _get_all_qualities(' '.join(args), cmd[-1] == 'p')
|
|
# error
|
|
if type(qualities) is str:
|
|
await event.reply(message='Error:\n\n%s' % qualities)
|
|
return
|
|
# success
|
|
result = 'Qualities:'
|
|
for qid in qualities:
|
|
data = qualities[qid]
|
|
# not a video
|
|
if data['is_audio']:
|
|
continue
|
|
# extensions to ignore
|
|
if data['ext'] in ['webm', 'mhtml']:
|
|
continue
|
|
result += '\n\n<code>mdd%s %s</code>' % ('p' if cmd[-1] == 'p' else '', qid)
|
|
result += '\n- Format name: %s' % data['format_name']
|
|
result += '\n- Extension: %s' % data['ext']
|
|
if 'filesize' in data:
|
|
result += '\n- Filesize: %s KB' % (data['filesize'] / 1000)
|
|
await event.reply(message=result, parse_mode='HTML')
|
|
# list audio qualities
|
|
elif cmd.startswith('madl'):
|
|
if not args:
|
|
await event.reply(message='No URL!')
|
|
return
|
|
await event.reply(message='Checking URL... Please wait, you\'ll be notified if an error happens!')
|
|
qualities = await _get_all_qualities(' '.join(args), cmd[-1] == 'p')
|
|
# error
|
|
if type(qualities) is str:
|
|
await event.reply(message='Error:\n\n%s' % qualities)
|
|
return
|
|
# success
|
|
result = 'Qualities:'
|
|
for qid in qualities:
|
|
data = qualities[qid]
|
|
# not an audio
|
|
if not data['is_audio']:
|
|
continue
|
|
# extensions to ignore
|
|
if data['ext'] in ['webm']:
|
|
continue
|
|
result += '\n\n<code>mdd%s %s</code>' % ('p' if cmd[-1] == 'p' else '', qid)
|
|
result += '\n- Format name: %s' % data['format_name']
|
|
result += '\n- Extension: %s' % data['ext']
|
|
if 'filesize' in data:
|
|
result += '\n- Filesize: %s KB' % (data['filesize'] / 1000)
|
|
await event.reply(message=result, parse_mode='HTML')
|
|
# download
|
|
elif cmd.startswith('mdd'):
|
|
if not args:
|
|
await event.reply(message='No CODE!')
|
|
return
|
|
# get the code and check it
|
|
code = args[0]
|
|
if code not in _cached_qualities:
|
|
await event.reply(message='This code does not exist. Use \'mvdl[p]\' to obtain the code.')
|
|
return
|
|
# get video data
|
|
data = _cached_qualities[code]
|
|
await event.reply(message='Downloading the media... Please wait, you\'ll be notified if an error happens!')
|
|
res = await _download_video(data['url'], code, 'mvd_temp/%s.bin' % code, cmd[-1] == 'p')
|
|
# res is str - error
|
|
if type(res) is str:
|
|
utils.rm_glob('mvd_temp/%s.*' % code)
|
|
await event.reply(message='Failed to download media: %s' % res)
|
|
return
|
|
# res is false
|
|
if not res:
|
|
utils.rm_glob('mvd_temp/%s.*' % code)
|
|
await event.reply(message='Something went wrong during downloading...' % res)
|
|
return
|
|
# old name and new name
|
|
old_name = 'mvd_temp/%s.bin' % code
|
|
new_name = 'mvd_temp/%s.%s' % (code, data['ext'])
|
|
# rename
|
|
try:
|
|
shutil.move(old_name, new_name)
|
|
pass
|
|
except:
|
|
utils.rm_glob('mvd_temp/%s.*' % code)
|
|
await event.reply(message='Failed to rename downloaded media')
|
|
return
|
|
# get video data
|
|
video_data = await _get_video_data(new_name, data['is_audio'])
|
|
if type(video_data) is not dict:
|
|
utils.rm_glob('mvd_temp/%s.*' % code)
|
|
await event.reply(message='Failed to use \'ffprobe\' to get media data:\n\n%s' % video_data)
|
|
return
|
|
# audio
|
|
if data['is_audio']:
|
|
# assign track title
|
|
remains = [i.strip() for i in ' '.join(args[1:]).split('§') if i.strip()]
|
|
title = data['title']
|
|
author = 'tg-utility & %s' % data['author']
|
|
thumbnail = data['thumbnail']
|
|
# use title and author from user message
|
|
if remains:
|
|
title = remains[0]
|
|
if len(remains) >= 2:
|
|
author = remains[1]
|
|
if len(remains) >= 3:
|
|
thumbnail = remains[2]
|
|
# thumbnail exists, download it and change scale
|
|
if thumbnail:
|
|
thumb_path = 'mvd_temp/%s.jpg' % code
|
|
thumbnail = await _download_thumb_for_tg(thumbnail, cmd[-1] == 'p', thumb_path)
|
|
if thumbnail:
|
|
await event.reply(message='WARNING (audio is still being processed - THIS IS NOT AN ERROR). Failed to download thumbnail.\n\n%s' % thumbnail)
|
|
thumbnail = None
|
|
else:
|
|
thumbnail = thumb_path
|
|
# thumbnail really exists, apply it to track
|
|
if thumbnail:
|
|
new_file_args = None
|
|
very_new_file = None
|
|
if data['ext'] in ('m4a', 'aac'):
|
|
very_new_file = 'mvd_temp/%s.covered.%s' % (code, data['ext'])
|
|
new_file_args = [
|
|
utils.which('ffmpeg'),
|
|
'-i',
|
|
new_name,
|
|
'-i',
|
|
thumbnail,
|
|
'-map',
|
|
'0',
|
|
'-map',
|
|
'1',
|
|
'-c',
|
|
'copy',
|
|
'-disposition:v',
|
|
'attached_pic',
|
|
very_new_file
|
|
]
|
|
# CLI arguments are present
|
|
if new_file_args:
|
|
# try to execute
|
|
res = await _execute_args_check_file(new_file_args, very_new_file)
|
|
# check if success
|
|
if not res:
|
|
new_name = very_new_file
|
|
# failure
|
|
else:
|
|
await event.reply(message='Couldn\'t set album art cover. Won\'t be beautiful, but will still play.\n\n%s' % res)
|
|
# send file
|
|
await event.reply(message='Audio is downloaded, uploading it to Telegram...')
|
|
try:
|
|
await event.client.send_file(
|
|
entity=peer,
|
|
file=new_name,
|
|
caption='This track is downloaded using tg-utility\n%s' % data['url'],
|
|
mime_type=utils.get_mime(data['ext']),
|
|
file_size=video_data['size'],
|
|
thumb=thumbnail,
|
|
attributes=[DocumentAttributeAudio(
|
|
duration=video_data['duration'],
|
|
title=title,
|
|
performer=author
|
|
)]
|
|
)
|
|
except:
|
|
await event.reply(message='Failed to upload audio to telegram!\n\n%s' % traceback.format_exc())
|
|
# video
|
|
else:
|
|
# generate the thumbnail
|
|
thumb_name = 'mvd_temp/%s.jpg' % code
|
|
if not await _generate_thumb(new_name, int(video_data['duration'] * 0.75), thumb_name):
|
|
utils.rm_glob('mvd_temp/%s.*' % code)
|
|
await event.reply(message='Failed to generate media thumbnail')
|
|
return
|
|
# log
|
|
await event.reply(message='Media is downloaded, thumbnail is generated, uploading it to Telegram...')
|
|
# send file
|
|
try:
|
|
await event.client.send_file(
|
|
entity=peer,
|
|
file=new_name,
|
|
caption='%s' % data['url'],
|
|
mime_type=utils.get_mime(data['ext']),
|
|
file_size=video_data['size'],
|
|
thumb=thumb_name,
|
|
supports_streaming=True,
|
|
attributes=[DocumentAttributeVideo(
|
|
duration=video_data['duration'],
|
|
w=video_data['width'],
|
|
h=video_data['height'],
|
|
supports_streaming=True
|
|
)]
|
|
)
|
|
except:
|
|
await event.reply(message='Failed to upload media to telegram!\n\n%s' % traceback.format_exc())
|
|
utils.rm_glob('mvd_temp/%s.*' % code)
|
|
except:
|
|
utils.pex()
|