Spaces:
Running
Running
import gradio as gr | |
import cv2 | |
import tempfile | |
import os | |
import requests | |
import base64 | |
import random | |
import string | |
import json | |
import ast | |
import uuid | |
def check_nsfw(img_url): | |
session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) | |
data = { | |
'data': [ | |
{'path': img_url}, | |
"chen-convnext", | |
0.5, | |
True, | |
True | |
], | |
'session_hash': session_hash, | |
'fn_index': 0, | |
'trigger_id': 12 | |
} | |
r = requests.post('https://yoinked-da-nsfw-checker.hf.space/queue/join', json=data) | |
r = requests.get(f'https://yoinked-da-nsfw-checker.hf.space/queue/data?session_hash={session_hash}', stream=True) | |
buffer = "" # Buffer to accumulate the chunks | |
for content in r.iter_content(100): | |
# Decode the byte content to a string | |
buffer += content.decode('utf-8') | |
return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label'] | |
def check_nsfw2(img_url): | |
session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) | |
data = { | |
'data': [ | |
{'path': img_url} | |
], | |
'session_hash': session_hash, | |
'fn_index': 0, | |
'trigger_id': 9 | |
} | |
r = requests.post('https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/join', json=data) | |
r = requests.get(f'https://jamescookjr90-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True) | |
buffer = "" # Buffer to accumulate the chunks | |
for content in r.iter_content(100): | |
# Decode the byte content to a string | |
buffer += content.decode('utf-8') | |
return json.loads(buffer.split('data:')[len(buffer.split('data:'))-2])["output"]["data"][0]['label'] | |
def check_nsfw3(img_url): | |
data = { | |
'data': [ | |
{'path': img_url} | |
] | |
} | |
r = requests.post('https://zanderlewis-xl-nsfw-detection.hf.space/call/predict',json=data) | |
json_data = r.json() | |
event_id = json_data['event_id'] | |
r = requests.get(f'https://zanderlewis-xl-nsfw-detection.hf.space/call/predict/{event_id}', stream=True) | |
event_stream = '' | |
for chunk in r.iter_content(100): | |
event_stream += chunk.decode('utf-8') | |
return ast.literal_eval(event_stream.split('data:')[-1])[0]['label'] | |
def get_replica_code(url): | |
try: | |
r = requests.get(url) | |
return r.text.split('replicas/')[1].split('"};')[0] | |
except: | |
return None | |
def check_nsfw4(img_url): | |
code = get_replica_code('https://error466-falconsai-nsfw-image-detection.hf.space') | |
session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) | |
data = { | |
'data': [ | |
{'path': img_url} | |
], | |
'session_hash': session_hash, | |
'fn_index': 0, | |
'trigger_id': 58 | |
} | |
r = requests.post(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/join', json=data) | |
r = requests.get(f'https://error466-falconsai-nsfw-image-detection.hf.space/--replicas/{code}/queue/data?session_hash={session_hash}', stream=True) | |
buffer = "" # Buffer to accumulate the chunks | |
for content in r.iter_content(100): | |
# Decode the byte content to a string | |
buffer += content.decode('utf-8') | |
return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label'] | |
def check_nsfw5(img_url): | |
session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(7)) | |
data = { | |
'data': [ | |
{'path': img_url} | |
], | |
'session_hash': session_hash, | |
'fn_index': 0, | |
'trigger_id': 9 | |
} | |
r = requests.post('https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/join', json=data) | |
r = requests.get(f'https://phelpsgg-falconsai-nsfw-image-detection.hf.space/queue/data?session_hash={session_hash}', stream=True) | |
buffer = "" # Buffer to accumulate the chunks | |
for content in r.iter_content(100): | |
# Decode the byte content to a string | |
buffer += content.decode('utf-8') | |
return json.loads(buffer.split('data:')[-1])["output"]["data"][0]['label'] | |
def check_nsfw_final(img_url): | |
result = None | |
print(img_url) | |
try: | |
check = check_nsfw2(img_url) | |
if check == 'nsfw': | |
result = True | |
else: | |
result = False | |
except Exception as e: | |
print(e) | |
if result is None: | |
try: | |
check = check_nsfw3(img_url) | |
if check == 'nsfw': | |
result = True | |
else: | |
result = False | |
except Exception as e: | |
print(e) | |
if result is None: | |
try: | |
check = check_nsfw4(img_url) | |
if check == 'nsfw': | |
result = True | |
else: | |
result = False | |
except: | |
pass | |
if result is None: | |
try: | |
check = check_nsfw5(img_url) | |
if check == 'nsfw': | |
result = True | |
else: | |
result = False | |
except Exception as e: | |
print(e) | |
return result | |
def extract_frames(video_path, num_frames): | |
vidcap = cv2.VideoCapture(video_path) | |
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
frames = [] | |
for i in range(num_frames): | |
frame_number = int(i * total_frames / num_frames) | |
vidcap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
success, image = vidcap.read() | |
if success: | |
random_filename = f"{uuid.uuid4().hex}.jpg" | |
cv2.imwrite('/tmp/gradio/' + random_filename, image) | |
frames.append(random_filename) | |
else: | |
break | |
vidcap.release() | |
return frames | |
def process_video(video, num_frames): | |
frames = extract_frames(video, num_frames) | |
nsfw_count = 0 | |
total_frames = len(frames) | |
frame_results = [] | |
for frame_path in frames: | |
nsfw_detected = check_nsfw_final(f"https://sdafd-video-nsfw-detect.hf.space/file=/tmp/gradio/{frame_path}") | |
frame_results.append({ | |
"frame_path": frame_path, | |
"nsfw_detected": nsfw_detected | |
}) | |
if nsfw_detected: | |
nsfw_count += 1 | |
# Cleanup | |
for frame_path in frames: | |
if os.path.exists('/tmp/gradio/' + frame_path): | |
os.remove('/tmp/gradio/' + frame_path) | |
result = { | |
"nsfw_count": nsfw_count, | |
"total_frames": total_frames, | |
"frames": frame_results | |
} | |
return result | |
with gr.Blocks() as app: | |
gr.Markdown("## NSFW Frame Detection in Video") | |
with gr.Row(): | |
video_input = gr.Video(label="Upload Video") | |
num_frames_input = gr.Number(label="Number of Frames to Check", value=10, precision=0) | |
output_json = gr.JSON(label="Result") | |
submit_button = gr.Button("Submit") | |
submit_button.click( | |
fn=process_video, | |
inputs=[video_input, num_frames_input], | |
outputs=output_json | |
) | |
app.launch() | |