File size: 2,672 Bytes
09a66e2
2603477
3a9ff15
 
 
 
58faf67
09a66e2
58faf67
 
2603477
3a9ff15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2603477
 
3a9ff15
58faf67
3a9ff15
 
 
2603477
3a9ff15
09a66e2
 
58faf67
3a9ff15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import gradio as gr
from PIL import Image
import yt_dlp
import cv2
import tempfile
import time
from transformers import BlipProcessor, BlipForConditionalGeneration

processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

def get_video_frame(youtube_url, seek_time=0):
    # Download een stukje van de video (paar seconden), pak een frame rond seek_time
    with tempfile.NamedTemporaryFile(suffix=".mp4") as temp_video:
        ydl_opts = {
            'outtmpl': temp_video.name,
            'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
            'quiet': True,
            'noplaylist': True,
            'download_ranges': f"*{seek_time}-{seek_time+1}",
            'retries': 3,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            try:
                ydl.download([youtube_url])
            except Exception as e:
                return None, f"Download error: {str(e)}"

        # Pak frame
        vidcap = cv2.VideoCapture(temp_video.name)
        vidcap.set(cv2.CAP_PROP_POS_MSEC, 500)  # Pak frame halverwege het stukje
        success, image = vidcap.read()
        vidcap.release()
        if success:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(image)
            return pil_image, None
        else:
            return None, "Kon geen frame uitlezen."

def analyse_stream(youtube_url, interval=10, num_frames=3):
    results = []
    for i in range(num_frames):
        seek = i * interval
        img, err = get_video_frame(youtube_url, seek)
        if err or img is None:
            results.append((f"Fout: {err}", None))
            continue
        # Caption
        inputs = processor(images=img, return_tensors="pt")
        out = model.generate(**inputs)
        caption = processor.decode(out[0], skip_special_tokens=True)
        results.append((caption, img))
    return results

def gradio_multi(youtube_url):
    res = analyse_stream(youtube_url, interval=10, num_frames=3)
    texts = [r[0] for r in res]
    imgs = [r[1] for r in res]
    return texts, imgs

with gr.Blocks() as demo:
    gr.Markdown("# 🎥 YouTube livestream analyse (meerdere frames)")
    youtube_url = gr.Textbox(label="YouTube URL", value="https://www.youtube.com/watch?v=R5i7aeV8SB8")
    run_btn = gr.Button("Analyseer 3 beelden (om de 10 sec)")
    output = gr.Dataframe(label="Model antwoorden", headers=["Beschrijving"])
    images = gr.Gallery(label="Frames")

    run_btn.click(gradio_multi, inputs=youtube_url, outputs=[output, images])

demo.launch()