Files
2025-linux-tg-utility/mod_video_downloader.py
Nikita Tyukalov, ASUS, Linux fce7968a72 Fixed video streaming
2025-11-22 23:46:00 +03:00

406 lines
13 KiB
Python

''' mod_video_downloader for tg-utility '''
import json
import time
import select
import shutil
import asyncio
import traceback
import subprocess
import os
from telethon import events
from telethon.tl.types import PeerUser
from telethon.tl.types import DocumentAttributeVideo
import utils
# application config
_config = None
# cached video qualities
_cached_qualities = {
'03dcdde34c18d6128cf906a10365f014': {
'url': 'https://www.pornhub.com/view_video.php?viewkey=65edb3a6aede0',
'format': '240p',
'ext': 'mp4'
}
}
def _get_all_qualities_raw(url: str, proxy: bool, timeout: float = 20) -> dict | str:
''' Get all video qualities as dict '''
try:
# prepare arguments
args = [
utils.which('youtube-dl'),
'-J',
url
]
if proxy:
args = [utils.which('proxychains4')] + args
# start the process
cp = subprocess.run(
args,
capture_output=True,
timeout=timeout
)
# check the result
txt = cp.stdout.decode(encoding='utf-8')
try:
j = json.loads(txt)
except:
return cp.stderr.decode(encoding='ascii')
# dict to return
res = {}
# one format only - convert to many
if 'formats' not in j:
j['formats'] = [{
'format_id': j['format_id'],
'ext': j['ext']
}]
# check all formats
for f in j['formats']:
obj = {
'url': url,
'format': f['format_id'],
'ext': f['ext'],
'_added': time.time()
}
cache_id = utils.get_md5('\n'.join([str(obj[i]) for i in obj if i[0] != '_']))
_cached_qualities[cache_id] = obj
res[cache_id] = obj
return res
except subprocess.TimeoutExpired:
return 'Request timed out'
except:
return traceback.format_exc()
async def _get_all_qualities(url: str, proxy: bool, timeout: float = 20) -> dict | str:
''' Async version '''
try:
return await asyncio.to_thread(_get_all_qualities_raw, url, proxy, timeout)
except:
return traceback.format_exc()
def _download_video_raw(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool:
''' Download video from URL, use quality code to get info from cache '''
try:
# no such quality cached
if quality_code not in _cached_qualities:
return 'Not found quality for specified CODE!'
# get data
data = _cached_qualities[quality_code]
# prepare arguments
args = [
utils.which('youtube-dl'),
'--newline',
'-f',
data['format'],
'-o',
path,
url
]
if proxy:
args = [utils.which('proxychains4')] + args
# start the process
cp = subprocess.Popen(
args,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True
)
# check output every second
last_update_time = time.time()
last_progress = ''
while True:
lines = []
ready = True
while ready:
# check if terminated
if cp.poll() is not None:
# success
if os.path.isfile(path):
return True
# failure
else:
return 'youtube-dl has exited but output file does not exist'
break
ready, _, _ = select.select([cp.stdout], [], [], 0.1)
# data available
if ready:
l = cp.stdout.readline().strip()
# check if line starts with [download]
if not l.startswith('[download] '):
continue
# split
parts = [i for i in l.replace('[download]', '').split(' ') if i]
# no percent
if '%' not in parts[0]:
continue
# yes percent - that's progress
new_progress = parts[0]
# change
if new_progress != last_progress:
last_progress = new_progress
last_update_time = time.time()
time.sleep(1)
# timed out
if time.time() - last_update_time >= timeout:
return "Timed out"
except:
return traceback.format_exc()
async def _download_video(url: str, quality_code : str, path: str, proxy: bool, timeout: float = 20) -> str | bool:
''' Async version '''
try:
return await asyncio.to_thread(_download_video_raw, url, quality_code, path, proxy, timeout)
except:
return traceback.format_exc()
def _get_video_data_raw(path: str) -> dict | None:
''' Get video duration, width, height and file size '''
try:
# prepare arguments
args = [
utils.which('ffprobe'),
'-v',
'error',
'-select_streams',
'v:0',
'-show_entries',
'stream=width,height,duration',
'-show_entries',
'format=size',
'-of',
'default=noprint_wrappers=1:nokey=1',
path
]
# start the process
cp = subprocess.run(
args,
capture_output=True,
timeout=10
)
# check the result
txt = None
try:
txt = cp.stdout.decode(encoding='ascii').strip()
except:
return None
parts = [i.strip() for i in txt.split('\n') if i.strip()]
# result
return {
'width': int(parts[0]),
'height': int(parts[1]),
'duration': int(float(parts[2])),
'size': int(parts[3])
}
except subprocess.TimeoutExpired:
return 'ffprobe timed out'
except:
return traceback.format_exc()
async def _get_video_data(path: str) -> dict | None:
''' Async version '''
try:
return await asyncio.to_thread(_get_video_data_raw, path)
except:
return traceback.format_exc()
def _generate_thumb_raw(video: str, timestamp: int, thumb: str) -> bool:
''' Generates a thumbnail for Telegram '''
try:
# prepare arguments
args = [
utils.which('ffmpeg'),
'-ss',
str(timestamp),
'-i',
video,
'-vf',
'thumbnail,scale=\'min(320,iw)\':\'min(320,ih)\':force_original_aspect_ratio=decrease',
'-frames:v',
'1',
thumb
]
# start the process
cp = subprocess.run(
args,
capture_output=True,
timeout=10
)
# check the result
txt = None
try:
txt = cp.stdout.decode(encoding='ascii').strip()
txt = cp.stderr.decode(encoding='ascii').strip()
except:
pass
parts = [i.strip() for i in txt.split('\n') if i.strip()]
# result
return os.path.isfile(thumb)
except subprocess.TimeoutExpired:
return 'ffprobe timed out'
except:
return traceback.format_exc()
return False
async def _generate_thumb(video: str, timestamp: int, thumb: str) -> dict | None:
''' Async version '''
try:
return await asyncio.to_thread(_generate_thumb_raw, video, timestamp, thumb)
except:
return traceback.format_exc()
async def mod_init(config: dict) -> bool:
''' Initialize the mod '''
global _config
_config = config
# delete old temp
try:
shutil.rmtree('mvd_temp')
pass
except:
pass
utils.ensure_dir('mvd_temp')
print('[I] mod_video_downloader is initialized')
async def mod_deinit() -> None:
''' Deinitialize the mod '''
# delete temp
try:
shutil.rmtree('mvd_temp')
pass
except:
pass
print('[I] mod_video_downloader is deinitialized')
def mod_get_mighty() -> bool:
''' Mod is called 'mighty' if it receives all messages '''
return False
def mod_get_tags() -> None:
''' Get tags used by the mod '''
return ['mvd', 'mvdl', 'mvdlp', 'mvdd', 'mvddp']
async def mod_new_message(session, event) -> None:
''' Handle new message '''
try:
# get the message
msg = event.message
# not outgoing - do not process
if not msg.out:
return
# peer must be user
peer = msg.peer_id
if type(peer) is not PeerUser:
return
# get the text
text = msg.message
# get args
args = [i for i in text.split(' ') if i]
cmd = args[0].lower()
args = args[1:]
await asyncio.sleep(0.5)
# help
if cmd == 'mvd':
response_text = 'mod_video_downloader:'
response_text += '\n- mvdl[p] [URL] - get list of all video qualities'
response_text += '\n- mvdd[p] [CODE] - download video'
response_text += '\n\nUse \'p\' letter to utilize proxy'
await event.reply(message=response_text)
# list qualities
elif cmd.startswith('mvdl'):
if not args:
await event.reply(message='No URL!')
return
await event.reply(message='Checking URL... Please wait, you\'ll be notified if an error happens!')
qualities = await _get_all_qualities(' '.join(args), cmd[-1] == 'p')
# error
if type(qualities) is str:
await event.reply(message='Error:\n\n%s' % qualities)
return
# success
result = 'Qualities:'
for qid in qualities:
data = qualities[qid]
result += '\n\n<code>mvdd%s %s</code>' % ('p' if cmd[-1] == 'p' else '', qid)
result += '\n- Format: %s' % data['format']
result += '\n- Extension: %s' % data['ext']
await event.reply(message=result, parse_mode='HTML')
# download
elif cmd.startswith('mvdd'):
if not args:
await event.reply(message='No CODE!')
return
# get the code and check it
code = args[-1]
if code not in _cached_qualities:
await event.reply(message='This code does not exist. Use \'mvdl[p]\' to obtain the code.')
return
# get video data
data = _cached_qualities[code]
await event.reply(message='Downloading the video... Please wait, you\'ll be notified if an error happens!')
res = await _download_video(data['url'], code, 'mvd_temp/%s.bin' % code, cmd[-1] == 'p')
# res is str - error
if type(res) is str:
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Failed to download video: %s' % res)
return
# res is false
if not res:
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Something went wrong during downloading...' % res)
return
# old name and new name
old_name = 'mvd_temp/%s.bin' % code
new_name = 'mvd_temp/%s.%s' % (code, data['ext'])
# rename
try:
shutil.move(old_name, new_name)
pass
except:
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Failed to rename downloaded video')
return
# get video data
video_data = await _get_video_data(new_name)
if type(video_data) is not dict:
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Failed to use \'ffprobe\' to get video data')
return
# generate the thumbnail
thumb_name = 'mvd_temp/%s.jpg' % code
if not await _generate_thumb(new_name, int(video_data['duration'] * 0.75), thumb_name):
utils.rm_glob('mvd_temp/%s.*' % code)
await event.reply(message='Failed to generate video thumbnail')
return
# log
await event.reply(message='Video is downloaded, thumbnail is generated, uploading it to Telegram...')
# send file
try:
await event.client.send_file(
entity=peer,
file=new_name,
caption='%s' % data['url'],
mime_type=utils.get_mime(data['ext']),
file_size=video_data['size'],
thumb=thumb_name,
supports_streaming=True,
attributes=[DocumentAttributeVideo(
duration=video_data['duration'],
w=video_data['width'],
h=video_data['height'],
supports_streaming=True
)]
)
except:
await event.reply(message='Failed to upload video to telegram!')
utils.rm_glob('mvd_temp/%s.*' % code)
except:
utils.pex()