Spaces:
Sleeping
Sleeping
from huggingface_hub import snapshot_download | |
import gradio as gr | |
import openvino_genai | |
import librosa | |
import numpy as np | |
from threading import Lock, Event | |
from scipy.ndimage import uniform_filter1d | |
from queue import Queue, Empty | |
from googleapiclient.discovery import build | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import time | |
import cpuinfo | |
import gc | |
import os | |
import requests | |
from PIL import Image | |
from io import BytesIO | |
import openvino as ov | |
import threading | |
# Set CPU affinity for optimization | |
os.environ["GOMP_CPU_AFFINITY"] = "0-7" # Use first 8 CPU cores | |
os.environ["OMP_NUM_THREADS"] = "8" | |
# Configuration constants | |
GOOGLE_API_KEY = "AIzaSyAo-1iW5MEZbc53DlEldtnUnDaYuTHUDH4" | |
GOOGLE_CSE_ID = "3027bedf3c88a4efb" | |
DEFAULT_MAX_TOKENS = 100 | |
DEFAULT_NUM_IMAGES = 1 | |
MAX_HISTORY_TURNS = 2 | |
MAX_TOKENS_LIMIT = 1000 | |
# Download models | |
start_time = time.time() | |
snapshot_download(repo_id="OpenVINO/mistral-7b-instruct-v0.1-int8-ov", local_dir="mistral-ov") | |
snapshot_download(repo_id="OpenVINO/whisper-tiny-fp16-ov", local_dir="whisper-ov-model") | |
snapshot_download(repo_id="OpenVINO/InternVL2-1B-int8-ov", local_dir="internvl-ov") # Added for image analysis | |
print(f"Model download time: {time.time() - start_time:.2f} seconds") | |
# CPU-specific configuration | |
cpu_features = cpuinfo.get_cpu_info()['flags'] | |
config_options = {} | |
if 'avx512' in cpu_features: | |
config_options["ENFORCE_BF16"] = "YES" | |
print("Using AVX512 optimizations") | |
elif 'avx2' in cpu_features: | |
config_options["INFERENCE_PRECISION_HINT"] = "f32" | |
print("Using AVX2 optimizations") | |
# Initialize models with performance flags | |
start_time = time.time() | |
mistral_pipe = openvino_genai.LLMPipeline( | |
"mistral-ov", | |
device="CPU", | |
config={ | |
"PERFORMANCE_HINT": "THROUGHPUT", | |
**config_options | |
} | |
) | |
whisper_pipe = openvino_genai.WhisperPipeline( | |
"whisper-ov-model", | |
device="CPU" | |
) | |
pipe_lock = Lock() | |
print(f"Model initialization time: {time.time() - start_time:.2f} seconds") | |
# Initialize InternVL pipeline for image analysis (lazy loading) | |
internvl_pipe = None | |
internvl_lock = Lock() | |
def get_internvl_pipeline(): | |
global internvl_pipe | |
with internvl_lock: | |
if internvl_pipe is None: | |
print("Initializing InternVL pipeline...") | |
start_time = time.time() | |
internvl_pipe = openvino_genai.VLMPipeline("internvl-ov", device="CPU") | |
print(f"InternVL pipeline initialization time: {time.time() - start_time:.2f} seconds") | |
return internvl_pipe | |
# Warm up models | |
print("Warming up models...") | |
start_time = time.time() | |
with pipe_lock: | |
mistral_pipe.generate("Warmup", openvino_genai.GenerationConfig(max_new_tokens=10)) | |
whisper_pipe.generate(np.zeros(16000, dtype=np.float32)) | |
print(f"Model warmup time: {time.time() - start_time:.2f} seconds") | |
# Thread pools | |
generation_executor = ThreadPoolExecutor(max_workers=4) # Increased workers | |
image_executor = ThreadPoolExecutor(max_workers=8) | |
def fetch_images(query: str, num: int = DEFAULT_NUM_IMAGES) -> list: | |
"""Fetch unique images by requesting different result pages""" | |
start_time = time.time() | |
if num <= 0: | |
return [] | |
try: | |
service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY) | |
image_links = [] | |
seen_urls = set() # To track unique URLs | |
# Start from different positions to get unique images | |
for start_index in range(1, num * 2, 2): # Step by 2 to get different pages | |
if len(image_links) >= num: | |
break | |
res = service.cse().list( | |
q=query, | |
cx=GOOGLE_CSE_ID, | |
searchType="image", | |
num=1, # Get one result per request | |
start=start_index # Start at different positions | |
).execute() | |
if "items" in res and res["items"]: | |
item = res["items"][0] | |
# Skip duplicates | |
if item["link"] not in seen_urls: | |
image_links.append(item["link"]) | |
seen_urls.add(item["link"]) | |
print(f"Unique image fetch time: {time.time() - start_time:.2f} seconds") | |
return image_links[:num] # Return only the requested number | |
except Exception as e: | |
print(f"Error in image fetching: {e}") | |
return [] | |
def process_audio(data, sr): | |
start_time = time.time() | |
data = librosa.to_mono(data.T) if data.ndim > 1 else data | |
data = data.astype(np.float32) | |
data /= np.max(np.abs(data)) | |
rms = librosa.feature.rms(y=data, frame_length=2048, hop_length=512)[0] | |
smoothed_rms = uniform_filter1d(rms, size=5) | |
speech_frames = np.where(smoothed_rms > 0.025)[0] | |
if not speech_frames.size: | |
print(f"Audio processing time: {time.time() - start_time:.2f} seconds") | |
return None | |
start = max(0, int(speech_frames[0] * 512 - 0.1 * sr)) | |
end = min(len(data), int((speech_frames[-1] + 1) * 512 + 0.1 * sr)) | |
print(f"Audio processing time: {time.time() - start_time:.2f} seconds") | |
return data[start:end] | |
def transcribe(audio): | |
start_time = time.time() | |
if audio is None: | |
print(f"Transcription time: {time.time() - start_time:.2f} seconds") | |
return "" | |
sr, data = audio | |
processed = process_audio(data, sr) | |
if processed is None or len(processed) < 1600: | |
print(f"Transcription time: {time.time() - start_time:.2f} seconds") | |
return "" | |
if sr != 16000: | |
processed = librosa.resample(processed, orig_sr=sr, target_sr=16000) | |
result = whisper_pipe.generate(processed) | |
print(f"Transcription time: {time.time() - start_time:.2f} seconds") | |
return result | |
def stream_answer(message: str, max_tokens: int, include_images: bool) -> str: | |
start_time = time.time() | |
response_queue = Queue() | |
completion_event = Event() | |
error = [None] | |
optimized_config = openvino_genai.GenerationConfig( | |
max_new_tokens=max_tokens, | |
num_beams=1, | |
do_sample=False, | |
temperature=1.0, | |
top_p=0.9, | |
top_k=30, | |
streaming=True, | |
streaming_interval=5 # Batch tokens in groups of 5 | |
) | |
def callback(tokens): # Now accepts multiple tokens | |
response_queue.put("".join(tokens)) | |
return openvino_genai.StreamingStatus.RUNNING | |
def generate(): | |
try: | |
with pipe_lock: | |
mistral_pipe.generate(message, optimized_config, callback) | |
except Exception as e: | |
error[0] = str(e) | |
finally: | |
completion_event.set() | |
generation_executor.submit(generate) | |
accumulated = [] | |
token_count = 0 | |
last_gc = time.time() | |
while not completion_event.is_set() or not response_queue.empty(): | |
if error[0]: | |
yield f"Error: {error[0]}" | |
print(f"Stream answer time: {time.time() - start_time:.2f} seconds") | |
return | |
try: | |
token_batch = response_queue.get_nowait() | |
accumulated.append(token_batch) | |
token_count += len(token_batch) | |
# Periodic garbage collection | |
if time.time() - last_gc > 2.0: # Every 2 seconds | |
gc.collect() | |
last_gc = time.time() | |
yield "".join(accumulated) | |
except Empty: | |
continue | |
print(f"Generated {token_count} tokens in {time.time() - start_time:.2f} seconds " | |
f"({token_count/(time.time() - start_time):.2f} tokens/sec)") | |
yield "".join(accumulated) | |
def run_chat(message: str, history: list, include_images: bool, max_tokens: int, num_images: int): | |
start_time = time.time() | |
final_text = "" | |
# Create a placeholder for the streaming response | |
history.append((message, "", [])) | |
rendered_history = render_history(history) | |
yield rendered_history, gr.update(value="", interactive=False) | |
# Stream tokens and update chatbot in real-time | |
for output in stream_answer(message, max_tokens, include_images): | |
final_text = output | |
# Update only the last response in history | |
updated_history = history[:-1] + [(message, final_text, [])] | |
rendered_history = render_history(updated_history) | |
yield rendered_history, gr.update(value="", interactive=False) | |
images = [] | |
if include_images: | |
images = fetch_images(message, num_images) | |
# Update history with final response and images | |
history[-1] = (message, final_text, images) | |
if len(history) > MAX_HISTORY_TURNS: | |
history = history[-MAX_HISTORY_TURNS:] | |
rendered_history = render_history(history) | |
print(f"Total chat time: {time.time() - start_time:.2f} seconds") | |
yield rendered_history, gr.update(value="", interactive=True) | |
def render_history(history): | |
start_time = time.time() | |
rendered = [] | |
for user_msg, bot_msg, image_links in history: | |
text = bot_msg | |
if image_links: | |
images_html = "".join( | |
f"<img src='{url}' class='chat-image' onclick='showImage(\"{url}\")' />" | |
for url in image_links | |
) | |
text += f"<br><br><b>📸 Related Visuals:</b><br><div style='display: flex; flex-wrap: wrap;'>{images_html}</div>" | |
rendered.append((user_msg, text)) | |
return rendered | |
# ===== IMAGE ANALYSIS FUNCTIONS ===== | |
def load_image(image_source): | |
"""Load image from various sources: file path, URL, or PIL Image""" | |
if isinstance(image_source, str): | |
if image_source.startswith(("http://", "https://")): | |
# Load from URL | |
response = requests.get(image_source) | |
image = Image.open(BytesIO(response.content)).convert("RGB") | |
else: | |
# Load from file path | |
image = Image.open(image_source).convert("RGB") | |
elif isinstance(image_source, Image.Image): | |
# Already a PIL image | |
image = image_source | |
else: | |
raise ValueError("Unsupported image input type") | |
# Convert to OpenVINO tensor | |
image_data = np.array(image.getdata()).reshape(1, image.size[1], image.size[0], 3).astype(np.byte) | |
return ov.Tensor(image_data) | |
def analyze_image(image, url, prompt): | |
try: | |
# Determine image source (priority: uploaded image > URL) | |
image_source = image if image is not None else url | |
if not image_source: | |
return "⚠️ Please upload an image or enter an image URL" | |
# Convert to OpenVINO tensor | |
image_tensor = load_image(image_source) | |
# Get pipeline (lazy initialization) | |
pipe = get_internvl_pipeline() | |
# Generate response with thread safety | |
with internvl_lock: | |
pipe.start_chat() | |
output = pipe.generate(prompt, image=image_tensor, max_new_tokens=100) | |
pipe.finish_chat() | |
return output | |
except Exception as e: | |
return f"❌ Error: {str(e)}" | |
# ===== GRADIO INTERFACE ===== | |
css = """ | |
.processing { | |
animation: pulse 1.5s infinite; | |
color: #4a5568; | |
padding: 10px; | |
border-radius: 5px; | |
text-align: center; | |
margin: 10px 0; | |
} | |
@keyframes pulse { | |
0%, 100% { opacity: 1; } | |
50% { opacity: 0.5; } | |
} | |
.chat-image { | |
cursor: pointer; | |
transition: transform 0.2s; | |
max-height: 100px; | |
margin: 4px; | |
border-radius: 8px; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
} | |
.chat-image:hover { | |
transform: scale(1.05); | |
box-shadow: 0 4px 8px rgba(0,0,0,0.2); | |
} | |
.modal { | |
position: fixed; | |
top: 0; | |
left: 0; | |
width: 100%; | |
height: 100%; | |
background: rgba(0,0,0,0.8); | |
display: none; | |
z-index: 1000; | |
cursor: zoom-out; | |
} | |
.modal-content { | |
position: absolute; | |
top: 50%; | |
left: 50%; | |
transform: translate(-50%, -50%); | |
max-width: 90%; | |
max-height: 90%; | |
background: white; | |
padding: 10px; | |
border-radius: 12px; | |
} | |
.modal-img { | |
width: auto; | |
height: auto; | |
max-width: 100%; | |
max-height: 100%; | |
border-radius: 8px; | |
} | |
.chat-container { | |
border: 1px solid #e5e7eb; | |
border-radius: 12px; | |
padding: 20px; | |
margin-bottom: 20px; | |
} | |
.slider-container { | |
margin-top: 20px; | |
padding: 15px; | |
border-radius: 10px; | |
background-color: #f8f9fa; | |
} | |
.slider-label { | |
font-weight: bold; | |
margin-bottom: 5px; | |
} | |
.system-info { | |
background-color: #7B9BDB; | |
padding: 15px; | |
border-radius: 8px; | |
margin: 15px 0; | |
border-left: 4px solid #1890ff; | |
} | |
.typing-indicator { | |
display: inline-block; | |
position: relative; | |
width: 40px; | |
height: 20px; | |
} | |
.typing-dot { | |
display: inline-block; | |
width: 6px; | |
height: 6px; | |
border-radius: 50%; | |
background-color: #4a5568; | |
position: absolute; | |
animation: typing 1.4s infinite ease-in-out; | |
} | |
.typing-dot:nth-child(1) { | |
left: 0; | |
animation-delay: 0s; | |
} | |
.typing-dot:nth-child(2) { | |
left: 12px; | |
animation-delay: 0.2s; | |
} | |
.typing-dot:nth-child(3) { | |
left: 24px; | |
animation-delay: 0.4s; | |
} | |
@keyframes typing { | |
0%, 60%, 100% { transform: translateY(0); } | |
30% { transform: translateY(-5px); } | |
} | |
.tab-container { | |
border-radius: 12px; | |
padding: 20px; | |
background:#3fc9f8; | |
box-shadow: 0 4px 6px rgba(0,0,0,0.05); | |
margin-bottom: 20px; | |
} | |
.tab-header { | |
font-size: 24px; | |
margin-bottom: 20px; | |
padding-bottom: 10px; | |
border-bottom: 2px solid #e5e7eb; | |
} | |
""" | |
with gr.Blocks(css=css, title="EDU Chat by Phanindra Reddy K") as demo: | |
gr.Markdown("# 🤖 EDU CHAT BY PHANINDRA REDDY K") | |
# System info banner | |
gr.HTML(""" | |
<div class="system-info"> | |
<strong>Multi-Modal AI Assistant</strong> | |
<ul> | |
<li>Text & Voice Chat with Mistral-7B</li> | |
<li>Image Understanding with InternVL</li> | |
<li>Optimized for High-RAM Systems</li> | |
</ul> | |
</div> | |
""") | |
modal_html = """ | |
<div class="modal" id="imageModal" onclick="this.style.display='none'"> | |
<div class="modal-content"> | |
<img class="modal-img" id="expandedImg"> | |
</div> | |
</div> | |
<script> | |
function showImage(url) { | |
document.getElementById('expandedImg').src = url; | |
document.getElementById('imageModal').style.display = 'block'; | |
} | |
</script> | |
""" | |
gr.HTML(modal_html) | |
# Create tabs for different functionalities | |
with gr.Tabs(): | |
# ===== MAIN CHAT TAB ===== | |
with gr.Tab("💬 Chat Assistant", id="chat_tab"): | |
state = gr.State([]) | |
with gr.Column(scale=2, elem_classes="chat-container"): | |
chatbot = gr.Chatbot(label="Conversation", height=500, bubble_full_width=False) | |
with gr.Column(scale=1): | |
gr.Markdown("### 💬 Ask Your Question") | |
with gr.Row(): | |
user_input = gr.Textbox( | |
placeholder="Type your question here...", | |
label="", | |
container=False, | |
elem_id="question-input" | |
) | |
include_images = gr.Checkbox( | |
label="Include Visuals", | |
value=True, | |
container=False, | |
elem_id="image-checkbox" | |
) | |
# Add the sliders container | |
with gr.Column(elem_classes="slider-container"): | |
gr.Markdown("### ⚙️ Generation Settings") | |
with gr.Row(): | |
max_tokens = gr.Slider( | |
minimum=10, | |
maximum=MAX_TOKENS_LIMIT, # Increased to 1000 | |
value=DEFAULT_MAX_TOKENS, | |
step=10, | |
label="Response Length (Tokens)", | |
info=f"Max: {MAX_TOKENS_LIMIT} tokens (for detailed explanations)", | |
elem_classes="slider-label" | |
) | |
# Conditionally visible image slider row | |
with gr.Row(visible=True) as image_slider_row: | |
num_images = gr.Slider( | |
minimum=0, | |
maximum=5, | |
value=DEFAULT_NUM_IMAGES, | |
step=1, | |
label="Number of Images", | |
info="Set to 0 to disable images", | |
elem_classes="slider-label" | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("Send Text", variant="primary") | |
mic_btn = gr.Button("Transcribe Voice", variant="secondary") | |
mic = gr.Audio( | |
sources=["microphone"], | |
type="numpy", | |
label="Voice Input", | |
show_label=False, | |
elem_id="voice-input" | |
) | |
processing = gr.HTML(""" | |
<div id="processing" style="display: none;"> | |
<div class="processing">🔮 Processing your request...</div> | |
</div> | |
""") | |
# Toggle image slider visibility based on checkbox | |
def toggle_image_slider(include_visuals): | |
return gr.update(visible=include_visuals) | |
include_images.change( | |
fn=toggle_image_slider, | |
inputs=include_images, | |
outputs=image_slider_row | |
) | |
def toggle_processing(): | |
return gr.update(visible=True), gr.update(interactive=False) | |
def hide_processing(): | |
return gr.update(visible=False), gr.update(interactive=True) | |
# Update the submit_btn click handler to include streaming | |
submit_btn.click( | |
fn=toggle_processing, | |
outputs=[processing, submit_btn] | |
).then( | |
fn=lambda: (gr.update(visible=True), gr.update(interactive=False)), | |
outputs=[processing, submit_btn] | |
).then( | |
fn=run_chat, | |
inputs=[user_input, state, include_images, max_tokens, num_images], | |
outputs=[chatbot, user_input] | |
).then( | |
fn=lambda: (gr.update(visible=False), gr.update(interactive=True)), | |
outputs=[processing, submit_btn] | |
) | |
# Voice transcription | |
mic_btn.click( | |
fn=toggle_processing, | |
outputs=[processing, mic_btn] | |
).then( | |
fn=transcribe, | |
inputs=mic, | |
outputs=user_input | |
).then( | |
fn=hide_processing, | |
outputs=[processing, mic_btn] | |
) | |
# ===== IMAGE ANALYSIS TAB ===== | |
with gr.Tab("🖼️ Image Analysis", id="image_tab"): | |
with gr.Column(elem_classes="tab-container"): | |
gr.Markdown("## 🖼️ Image Understanding with InternVL") | |
gr.Markdown("Upload an image or enter a URL, then ask questions about it") | |
with gr.Row(): | |
with gr.Column(): | |
# Image upload | |
image_upload = gr.Image(type="pil", label="Upload Image") | |
# URL input | |
url_input = gr.Textbox( | |
label="OR Enter Image URL", | |
placeholder="https://example.com/image.jpg", | |
info="Enter a direct image URL" | |
) | |
# Preview image | |
preview = gr.Image(label="Preview", interactive=False) | |
# Update preview when inputs change | |
def update_preview(img, url): | |
if img is not None: | |
return img | |
elif url and url.startswith(("http://", "https://")): | |
return url | |
return None | |
image_upload.change(update_preview, [image_upload, url_input], preview) | |
url_input.change(update_preview, [image_upload, url_input], preview) | |
with gr.Column(): | |
# Question input | |
prompt = gr.Textbox( | |
label="Question", | |
placeholder="What is unusual in this image?", | |
info="Ask anything about the image" | |
) | |
# Submit button | |
img_submit_btn = gr.Button("Ask Question", variant="primary") | |
# Output | |
img_output = gr.Textbox(label="Model Response", interactive=False) | |
# Submit action | |
img_submit_btn.click( | |
fn=analyze_image, | |
inputs=[image_upload, url_input, prompt], | |
outputs=img_output | |
) | |
if __name__ == "__main__": | |
demo.launch(share=True, debug=True) |