File size: 8,213 Bytes
42c866b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
import gradio as gr
import requests
import re
import os
import json
import time
import threading
from googleapiclient.discovery import build
from huggingface_hub import InferenceClient
from pytube import YouTube
import whisper
import logging
# λ‘κ·Έ μ€μ
logging.basicConfig(level=logging.INFO)
# Whisper λͺ¨λΈ λ‘λ
model = whisper.load_model("base")
# YouTube API ν€
API_KEY = 'AIzaSyDUz3wkGal0ewRtPlzeMit88bV4hS4ZIVY'
# YouTube API μλΉμ€ λΉλ
youtube = build('youtube', 'v3', developerKey=API_KEY)
# Hugging Face API μ€μ
client = InferenceClient(model="meta-llama/Meta-Llama-3-70B-Instruct", token=os.getenv("HF_TOKEN"))
WEBHOOK_URL = "https://connect.pabbly.com/workflow/sendwebhookdata/IjU3NjUwNTZhMDYzMDA0MzA1MjZhNTUzMzUxM2Ii_pc"
COMMENTS_FILE = 'comments.json'
DEFAULT_SYSTEM_PROMPT = "λνμ λ°λμ λμ μ΄λ¦ 'GPTube'λ₯Ό λ°νλ©° νκΈλ‘ μΈμ¬λ₯ΌνλΌ. λ°λμ 'νκΈ'(νκ΅μ΄)λ‘ 250 ν ν° μ΄λ΄λ‘ λ΅λ³μ μμ±νκ³ μΆλ ₯νλΌ. Respond to the following YouTube comment in a friendly and helpful manner:"
stop_event = threading.Event() # μ€λ λ μ€μ§λ₯Ό μν μ΄λ²€νΈ
def load_existing_comments():
if os.path.exists(COMMENTS_FILE):
with open(COMMENTS_FILE, 'r') as file:
return json.load(file)
return []
def save_comments(comments):
with open(COMMENTS_FILE, 'w') as file:
json.dump(comments, file)
def download_audio(video_url):
yt = YouTube(video_url)
audio = yt.streams.filter(only_audio=True).first()
audio_path = audio.download(output_path=".")
file_stats = os.stat(audio_path)
logging.info(f'Size of audio file in Bytes: {file_stats.st_size}')
if file_stats.st_size <= 30000000: # Check the file size limit
base, ext = os.path.splitext(audio_path)
new_file = base + '.mp3'
os.rename(audio_path, new_file)
return new_file
else:
logging.error('Videos for transcription on this space are limited to about 1.5 hours. Please contact support for more information.')
return None
def generate_transcript(audio_path):
try:
if not audio_path or not os.path.exists(audio_path):
raise ValueError("μ ν¨ν μ€λμ€ νμΌ κ²½λ‘κ° μλλλ€.")
result = model.transcribe(audio_path)
return result['text'].strip()
except Exception as e:
logging.error(f"Exception during transcription: {str(e)}")
return f"μ μ¬ μ€ μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}"
def generate_reply(comment_text, system_prompt):
prompt = f"{system_prompt}\n\nComment: {comment_text}\n\nReply:"
response = client.text_generation(
prompt=prompt,
max_new_tokens=250,
temperature=0.7,
top_p=0.9
)
if isinstance(response, dict) and 'generated_text' in response:
return response['generated_text']
return response
def send_webhook(data):
response = requests.post(WEBHOOK_URL, json=data)
return response.status_code, response.text
def get_video_comments(video_id):
try:
comments = []
request = youtube.commentThreads().list(
part='snippet',
videoId=video_id,
maxResults=100, #λκΈ μ½μ΄λ€μ΄λ μ μ μ
textFormat='plainText'
)
response = request.execute()
while request is not None:
for item in response['items']:
snippet = item['snippet']['topLevelComment']['snippet']
comment = {
'comment_id': item['snippet']['topLevelComment']['id'],
'author': snippet['authorDisplayName'],
'published_at': snippet['publishedAt'],
'text': snippet['textDisplay'],
'reply_count': item['snippet']['totalReplyCount']
}
comments.append(comment)
if 'nextPageToken' in response:
request = youtube.commentThreads().list(
part='snippet',
videoId=video_id,
pageToken=response['nextPageToken'],
maxResults=100, #λκΈ μ½μ΄λ€μ΄λ μ μ μ
textFormat='plainText'
)
response = request.execute()
else:
break
return comments
except Exception as e:
return [{'error': str(e)}]
def fetch_comments(video_url, system_prompt):
log_entries = []
video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url)
if video_id_match:
video_id = video_id_match.group(1)
audio_path = download_audio(video_url)
if not audio_path:
return "μ€λμ€λ₯Ό λ€μ΄λ‘λν μ μμ΅λλ€."
transcript = generate_transcript(audio_path)
existing_comments = load_existing_comments()
new_comments = get_video_comments(video_id)
if not new_comments or 'error' in new_comments[0]:
return "λκΈμ μ°Ύμ μ μκ±°λ μ€λ₯κ° λ°μνμ΅λλ€."
recent_new_comments = [c for c in new_comments if c['comment_id'] not in {c['comment_id'] for c in existing_comments} and c['reply_count'] == 0]
if recent_new_comments:
for most_recent_comment in recent_new_comments:
combined_prompt = f"{transcript}\n\n{system_prompt}"
reply_text = generate_reply(most_recent_comment['text'], combined_prompt)
webhook_data = {
"comment_id": most_recent_comment['comment_id'],
"author": most_recent_comment['author'],
"published_at": most_recent_comment['published_at'],
"text": most_recent_comment['text'],
"reply_text": reply_text
}
webhook_status, webhook_response = send_webhook(webhook_data)
log_entries.append(f"μ΅κ·Ό λκΈ: {most_recent_comment['text']}\n\nλ΅λ³ μμ±: {reply_text}\n\nμΉν
μλ΅: {webhook_status} - {webhook_response}")
existing_comments.append(most_recent_comment)
save_comments(existing_comments)
else:
log_entries.append("μλ‘μ΄ λκΈμ΄ μμ΅λλ€.")
else:
log_entries.append("μ ν¨νμ§ μμ YouTube URLμ
λλ€.")
return "\n\n".join(log_entries)
def background_fetch_comments():
while not stop_event.is_set():
result = fetch_comments("https://www.youtube.com/watch?v=dQw4w9WgXcQ", DEFAULT_SYSTEM_PROMPT) # URLκ³Ό ν둬ννΈ μ€μ μ¬μ© μμ
print(result)
time.sleep(10)
def start_background_fetch():
threading.Thread(target=background_fetch_comments).start()
def stop_background_fetch():
stop_event.set()
def get_text(video_url):
audio_path = download_audio(video_url)
if not audio_path:
return "μ€λμ€λ₯Ό λ€μ΄λ‘λν μ μμ΅λλ€."
transcript = generate_transcript(audio_path)
return transcript
# Gradio μΈν°νμ΄μ€ μ μ
demo = gr.Blocks()
with demo:
gr.Markdown("<h1><center>GPTube</center></h1>")
with gr.Row():
input_text_url = gr.Textbox(placeholder='YouTube video URL', label='YouTube URL')
input_text_prompt = gr.Textbox(placeholder='μμ€ν
ν둬ννΈ', label='μμ€ν
ν둬ννΈ', value=DEFAULT_SYSTEM_PROMPT, lines=5)
with gr.Row():
result_button_transcribe = gr.Button('Transcribe')
result_button_comments = gr.Button('Fetch Comments and Generate Reply')
with gr.Row():
output_text_transcribe = gr.Textbox(placeholder='Transcript of the YouTube video.', label='Transcript', lines=20)
output_text_prompt = gr.Textbox(placeholder='μλ΅ ν
μ€νΈ', label='μλ΅ ν
μ€νΈ', lines=20)
result_button_transcribe.click(get_text, inputs=input_text_url, outputs=output_text_transcribe, api_name="transcribe_api")
result_button_comments.click(fetch_comments, inputs=[input_text_url, input_text_prompt], outputs=output_text_prompt, api_name="fetch_comments_api")
# μΈν°νμ΄μ€ μ€ν
demo.launch()
|