import gradio as gr
from transformers import LlavaOnevisionProcessor, LlavaOnevisionForConditionalGeneration, TextIteratorStreamer
from threading import Thread
import re
import time 
from PIL import Image
import torch
import cv2
import spaces

model_id = "llava-hf/llava-onevision-qwen2-0.5b-ov-hf"

processor = LlavaOnevisionProcessor.from_pretrained(model_id)

model = LlavaOnevisionForConditionalGeneration.from_pretrained(model_id, torch_dtype=torch.float16)
model.to("cuda")

def sample_frames(video_file, num_frames):
    video = cv2.VideoCapture(video_file)
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    interval = total_frames // num_frames
    frames = []
    for i in range(total_frames):
        ret, frame = video.read()
        pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if not ret:
            continue
        if i % interval == 0:
            frames.append(pil_img)
    video.release()
    return frames

@spaces.GPU
def bot_streaming(message, history):

  txt = message.text
  ext_buffer = f"user\n{txt} assistant"

  if message.files:
    if len(message.files) == 1:
      image = [message.files[0].path]
    # interleaved images or video
    elif len(message.files) > 1:
      image = [msg.path for msg in message.files]
  else:
    # if there's no image uploaded for this turn, look for images in the past turns
    # kept inside tuples, take the last one
    for hist in history:
      if type(hist[0])==tuple:
        image = hist[0][0]

  if message.files is None:
      gr.Error("You need to upload an image or video for LLaVA to work.")
      
  video_extensions = ("avi", "mp4", "mov", "mkv", "flv", "wmv", "mjpeg")
  image_extensions = Image.registered_extensions()
  image_extensions = tuple([ex for ex, f in image_extensions.items()])
  if len(image) == 1:
    if image[0].endswith(video_extensions):
        
        video = sample_frames(image[0], 32)
        image = None
        prompt = f"<|im_start|>user <video>\n{message.text}<|im_end|><|im_start|>assistant"
    elif image[0].endswith(image_extensions):
        image = Image.open(image[0]).convert("RGB")
        video = None
        prompt = f"<|im_start|>user <image>\n{message.text}<|im_end|><|im_start|>assistant"

  elif len(image) > 1:
    image_list = []
    user_prompt = message.text

    for img in image:
      if img.endswith(image_extensions):
        img = Image.open(img).convert("RGB")
        image_list.append(img)

      elif img.endswith(video_extensions):        
        frames = sample_frames(img, 6)
        for frame in frames:
          image_list.append(frame)
      
    toks = "<image>" * len(image_list)
    prompt = "<|im_start|>user"+ toks + f"\n{user_prompt}<|im_end|><|im_start|>assistant"

    image = image_list
    video = None


  inputs = processor(text=prompt, images=image, videos=video, return_tensors="pt").to("cuda", torch.float16)
  streamer = TextIteratorStreamer(processor, **{"max_new_tokens": 200, "skip_special_tokens": True})
  generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=100)
  generated_text = ""

  thread = Thread(target=model.generate, kwargs=generation_kwargs)
  thread.start()

  

  buffer = ""
  for new_text in streamer:
    
    buffer += new_text
    
    generated_text_without_prompt = buffer[len(ext_buffer):]
    time.sleep(0.01)
    yield generated_text_without_prompt


demo = gr.ChatInterface(fn=bot_streaming, title="LLaVA Onevision", examples=[
     {"text": "Do the cats in these two videos have same breed? What breed is each cat?", "files":["./cats_1.mp4", "./cats_2.mp4"]},
    {"text": "These are the tech specs of two laptops I am choosing from. Which one should I choose for office work?", "files":["./dell-tech-specs.jpeg", "./asus-tech-specs.png"]},
     {"text": "Here are several images from a cooking book, showing how to prepare a meal step by step. Can you write a recipe for the meal, describing each step in details?", "files":["./step0.png", "./step1.png", "./step2.png", "./step3.png", "./step4.png", "./step5.png"]}, 

    {"text": "What is on the flower?", "files":["./bee.jpg"]},
    {"text": "This is a video explaining how to create a Presentation in GoogleSlides. Can you write down what I should do step by step, following the video?", "files":["./tutorial.mp4"]}], 
      textbox=gr.MultimodalTextbox(file_count="multiple"), 
      description="Try [LLaVA Onevision](https://huggingface.co/docs/transformers/main/en/model_doc/llava_onevision) in this demo (more specifically, the [Qwen-2-0.5B-Instruct variant](https://huggingface.co/llava-hf/llava-onevision-qwen2-0.5b-ov-hf)). Upload an image or a video, and start chatting about it, or simply try one of the examples below. If you don't upload an image, you will receive an error. ",
      stop_btn="Stop Generation", multimodal=True)
demo.launch(debug=True)