Spaces:
Sleeping
Sleeping
import requests, hashlib, os | |
from others import * | |
api_url = 'https://api.iwara.tv' | |
file_url = 'https://files.iwara.tv' | |
class BearerAuth(requests.auth.AuthBase): | |
"""Bearer Authentication""" | |
def __init__(self, token): | |
self.token = token | |
def __call__(self, r): | |
r.headers['Authorization'] = 'Bearer ' + self.token | |
return r | |
class ApiClient: | |
def __init__(self, email, password): | |
self.email = email | |
self.password = password | |
# self.headers = { | |
# 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36', | |
# 'X-Version': 's' | |
# } | |
# API | |
self.api_url = api_url | |
self.file_url = file_url | |
self.timeout = 30 | |
# self.max_retries = 5 | |
self.download_timeout = 300 | |
self.token = None | |
# HTML | |
# self.html_url = html_url | |
# Cloudscraper | |
# self.scraper = cloudscraper.create_scraper(browser={'browser': 'firefox','platform': 'windows','mobile': False}, | |
# # interpreter = 'nodejs' | |
# ) | |
# Requests-html | |
# self.session = HTMLSession() | |
def login(self) -> requests.Response: | |
url = self.api_url + '/user/login' | |
json = {'email': self.email, 'password': self.password} | |
r = requests.post(url, json=json, timeout=self.timeout) | |
try: | |
self.token = r.json()['token'] | |
print('API Login success') | |
except: | |
print('API Login failed') | |
# try: | |
# # Cloudscraper | |
# # r = self.scraper.post(url, json=json, headers=self.headers, timeout=self.timeout) | |
# # Requests-html | |
# r = self.session.post(url, json=json, headers=self.headers, timeout=self.timeout) | |
# except: | |
# print('BS4 Login failed') | |
return r | |
# limit query is not working | |
def get_videos(self, sort = 'date', rating = 'all', page = 0, limit = 32, subscribed = False) -> requests.Response: | |
"""# Get new videos from iwara.tv | |
- sort: date, trending, popularity, views, likes | |
- rating: all, general, ecchi | |
""" | |
url = self.api_url + '/videos' | |
params = {'sort': sort, | |
'rating': rating, | |
'page': page, | |
'limit': limit, | |
'subscribed': 'true' if subscribed else 'false', | |
} | |
if self.token is None: | |
r = requests.get(url, params=params, timeout=self.timeout) | |
else: | |
# Verbose Debug | |
# request = requests.Request('GET', url, params=params, auth=BearerAuth(self.token)) | |
# print(request.prepare().method, request.prepare().url, request.prepare().headers, request.prepare().body, sep='\n') | |
# r = requests.Session().send(request.prepare()) | |
r = requests.get(url, params=params, auth=BearerAuth(self.token), timeout=self.timeout) | |
#Debug | |
print("[DEBUG] get_videos response:", r) | |
return r | |
def get_video(self, video_id) -> requests.Response: | |
"""# Get video info from iwara.tv | |
""" | |
url = self.api_url + '/video/' + video_id | |
if self.token is None: | |
r = requests.get(url, timeout=self.timeout) | |
else: | |
r = requests.get(url, auth=BearerAuth(self.token), timeout=self.timeout) | |
#Debug | |
print("[DEBUG] get_video response:", r) | |
return r | |
def download_video_thumbnail(self, video_id) -> str: | |
"""# Download video thumbnail from iwara.tv | |
""" | |
video = self.get_video(video_id).json() | |
file_id = video['file']['id'] | |
thumbnail_id = video['thumbnail'] | |
url = self.file_url + '/image/original/' + file_id + '/thumbnail-{:02d}.jpg'.format(thumbnail_id) | |
thumbnail_file_name = video_id + '.jpg' | |
if (os.path.exists(thumbnail_file_name)): | |
print(f"Video ID {video_id} thumbnail already downloaded, skipped downloading. ") | |
return thumbnail_file_name | |
print(f"Downloading thumbnail for video ID: {video_id} ...") | |
with open(thumbnail_file_name, "wb") as f: | |
for chunk in requests.get(url).iter_content(chunk_size=1024): | |
if chunk: | |
f.write(chunk) | |
f.flush() | |
return thumbnail_file_name | |
def download_video(self, video_id) -> str: | |
"""# Download video from iwara.tv | |
""" | |
# html | |
# url = self.html_url + '/video/' + video_id | |
# Cloudscraer | |
# html = self.scraper.get(url, auth=BearerAuth(self.token), timeout=self.timeout).text | |
# Requests-html | |
# html = self.session.get(url, auth=BearerAuth(self.token), timeout=self.timeout).text | |
# print(html) | |
# html = BeautifulSoup(, 'html.parser') | |
# downloadLink = html.find('div', class_='dropdown_content') | |
# print(downloadLink) | |
# API | |
try: | |
video = self.get_video(video_id).json() | |
except Exception as e: | |
raise Exception(f"Failed to get video info for video ID: {video_id}, error: {e}") | |
#Debug | |
print(video) | |
url = video['fileUrl'] | |
file_id = video['file']['id'] | |
expires = url.split('/')[4].split('?')[1].split('&')[0].split('=')[1] | |
# IMPORTANT: This might change in the future. | |
SHA_postfix = "_5nFp9kmbNnHdAFhaqMvt" | |
SHA_key = file_id + "_" + expires + SHA_postfix | |
hash = hashlib.sha1(SHA_key.encode('utf-8')).hexdigest() | |
headers = {"X-Version": hash} | |
resources = requests.get(url, headers=headers, auth=BearerAuth(self.token), timeout=self.timeout).json() | |
#Debug | |
print(resources) | |
resources_by_quality = [None for i in range(10)] | |
for resource in resources: | |
if resource['name'] == 'Source': | |
resources_by_quality[0] = resource | |
# elif resource['name'] == '1080': | |
# resources_by_quality[1] = resource | |
# elif resource['name'] == '720': | |
# resources_by_quality[2] = resource | |
# elif resource['name'] == '480': | |
# resources_by_quality[3] = resource | |
# elif resource['name'] == '540': | |
# resources_by_quality[4] = resource | |
# elif resource['name'] == '360': | |
# resources_by_quality[5] = resource | |
for resource in resources_by_quality: | |
if resource is not None: | |
#Debug | |
print(resource) | |
download_link = "https:" + resource['src']['download'] | |
file_type = resource['type'].split('/')[1] | |
video_file_name = video_id + '.' + file_type | |
if (os.path.exists(video_file_name)): | |
print(f"Video ID {video_id} Already downloaded, skipped downloading. ") | |
return video_file_name | |
print(f"Downloading video ID: {video_id} ...") | |
try: | |
with open(video_file_name, "wb") as f: | |
for chunk in requests.get(download_link).iter_content(chunk_size=1024): | |
if chunk: | |
f.write(chunk) | |
f.flush() | |
return video_file_name | |
except Exception as e: | |
os.remove(video_file_name) | |
raise Exception(f"Failed to download video ID: {video_id}, error: {e}") | |
raise Exception("No video with Source quality found") | |
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
### download video from iwara.tv | |
### usage: python iwara [url] | |
### by AngelBottomless @ github | |
# download from iwara page | |
import requests | |
# use selenium to get video url | |
from selenium import webdriver | |
import argparse | |
def download_video(url): | |
# save video to local | |
filename = url.split('/')[-1] + '.mp4' | |
# get video | |
driver = run_webdriver(url) | |
click_accept(driver) | |
driver.implicitly_wait(2) | |
click_play(driver) | |
url = find_video_url(driver) | |
# download video | |
r = requests.get(url) | |
with open(filename, 'wb') as f: | |
f.write(r.content) | |
# close driver | |
driver.close() | |
def download_with_retry(url, retry=3): | |
# retry download | |
for _ in range(retry): | |
try: | |
download_video(url) | |
return True | |
except: | |
print('download failed, retrying...') | |
continue | |
return False | |
def run_webdriver(url): | |
# use selenium to get video url | |
# mute chrome | |
chrome_options = webdriver.ChromeOptions() | |
chrome_options.add_argument("--mute-audio") | |
# run webdriver | |
driver = webdriver.Chrome(options=chrome_options) | |
driver.get(url) | |
driver.implicitly_wait(4) | |
return driver | |
def click_accept(driver): | |
# xpath = /html/body/div[3]/div/div[2]/button[1] | |
button = driver.find_element('xpath', '/html/body/div[3]/div/div[2]/button[1]') | |
button.click() | |
def click_play(driver): | |
# xpath = //*[@id="vjs_video_3"]/button | |
button = driver.find_element('xpath', '//*[@id="vjs_video_3"]/button') | |
button.click() | |
def find_video_url(driver): | |
# xpath //*[@id="vjs_video_3_html5_api"] | |
#access 'src' | |
video = driver.find_element('xpath', '//*[@id="vjs_video_3_html5_api"]') | |
video_url = video.get_attribute('src') | |
return video_url | |
def track_clipboard(): | |
import pyperclip | |
import time | |
import subprocess | |
failed_urls = [] | |
success_urls = set() | |
print('tracking clipboard...') | |
# loop to track clipboard | |
# if clipboard contains url, download video | |
# track every 1 second | |
previous = '' | |
# expect KeyboardInterrupt and return 0 | |
try: | |
while True: | |
# get clipboard | |
clipboard = pyperclip.paste() | |
if clipboard != previous: | |
# if clipboard contains url | |
if 'iwara.tv' in clipboard: | |
print('url detected, downloading...') | |
# use subprocess to download video in background | |
# ['python', '-m', 'iwara', clipboard] | |
subprocess.Popen(['python', '-m', 'iwara', clipboard]) | |
print('download complete') | |
previous = clipboard | |
time.sleep(1) | |
except KeyboardInterrupt: | |
print('exiting...') | |
return 0 | |
if __name__ == '__main__': | |
failed_urls = [] | |
success_urls = set() | |
import sys | |
# parse args | |
parser = argparse.ArgumentParser() | |
# track clipboard option, when 'track' is used, url is not required | |
parser.add_argument('-t', '--track', action='store_true', help='track clipboard for iwara url') | |
# add url argument, if not specified, use '' | |
parser.add_argument('url', nargs='?', default='', help='iwara url') | |
args = parser.parse_args() | |
# download video | |
if args.track: | |
track_clipboard() | |
elif 'iwara.tv' in args.url: | |
result = download_with_retry(args.url) | |
if not result: | |
print('download failed') | |
failed_urls.append(args.url) | |
else: | |
print('download complete') | |
success_urls.add(args.url) | |
if len(failed_urls) > 0: | |
print('failed urls:') | |
for url in failed_urls: | |
print(url) | |
# write in ./failed.txt | |
with open('failed.txt', 'a') as f: | |
f.write(url + '\n') | |
sys.exit(1) | |
else: | |
print('invalid url') | |
sys.exit(1) | |
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |
def iwara(video_url, judul): | |
# Set the path to the thumbnail directory | |
directory = "/home/user/app/Iwara" | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
judul = judul.replace('_',' ').title().replace('Mmd','MMD').replace('/',' ').replace('Nikke','NIKKE').replace('Fate','FATE').replace('】','】 ').replace(' ', ' ') | |
thumbnail_url = 'https://saradahentai.com/wp-content/uploads/2023/03/Live-Footage-of-Ashley-Graham-Captured-by-fugtrup-Resident-Evil-4.jpg' | |
thumbnail_file = download_file(thumbnail_url, judul, directory) | |
video_file = download_file(video_url, judul, directory) | |
# Mengkonversi video | |
video_file = convert_videos(720, video_file) | |
video_info = f"Judul: {judul}\n" | |
return video_file, judul, video_info, thumbnail_file |