import streamlit as st |
import os |
import tempfile |
import gc |
import base64 |
import time |
import yaml |
from tqdm import tqdm |
from datetime import datetime |
from typing import Optional |
from crawl4ai_scrapper import scrape_multiple_channels |
from crewai import Agent, Crew, Process, Task, LLM |
from crewai_tools import FileReadTool |
from dotenv import load_dotenv |
load_dotenv() |
class CerebrasLLM(LLM): |
def __init__(self, model: str, api_key: str, base_url: str, **kwargs): |
from llama_index.llms.cerebras import Cerebras |
self.client = Cerebras( |
model=model, |
api_key=api_key, |
base_url=base_url, |
**kwargs |
) |
def generate(self, prompt: str, **kwargs) -> str: |
response = self.client.complete(prompt, **kwargs) |
return response.text |
@st.cache_resource |
def load_llm() -> CerebrasLLM: |
return CerebrasLLM( |
model="llama-3.3-70b", |
api_key=os.getenv("CEREBRAS_API_KEY"), |
base_url="https://api.cerebras.ai/v1", |
temperature=0.7, |
max_tokens=4096, |
top_p=0.95, |
timeout=30 |
) |
class YouTubeAnalyzer: |
def __init__(self): |
self.docs_tool = FileReadTool() |
self.llm = load_llm() |
def create_crew(self): |
with open("config.yaml", 'r') as file: |
config = yaml.safe_load(file) |
analysis_agent = Agent( |
role=config["agents"][0]["role"], |
goal=config["agents"][0]["goal"], |
backstory=config["agents"][0]["backstory"], |
verbose=True, |
tools=[self.docs_tool], |
llm=self.llm, |
memory=True |
) |
synthesis_agent = Agent( |
role=config["agents"][1]["role"], |
goal=config["agents"][1]["goal"], |
backstory=config["agents"][1]["backstory"], |
verbose=True, |
llm=self.llm, |
allow_delegation=False |
) |
analysis_task = Task( |
description=config["tasks"][0]["description"], |
expected_output=config["tasks"][0]["expected_output"], |
agent=analysis_agent, |
output_file="analysis_raw.md" |
) |
synthesis_task = Task( |
description=config["tasks"][1]["description"], |
expected_output=config["tasks"][1]["expected_output"], |
agent=synthesis_agent, |
context=[analysis_task], |
output_file="final_report.md" |
) |
return Crew( |
agents=[analysis_agent, synthesis_agent], |
tasks=[analysis_task, synthesis_task], |
process=Process.sequential, |
verbose=2 |
) |
class StreamlitApp: |
def __init__(self): |
self.analyzer = YouTubeAnalyzer() |
self._init_session_state() |
def _init_session_state(self): |
if "response" not in st.session_state: |
st.session_state.response = None |
if "crew" not in st.session_state: |
st.session_state.crew = None |
if "youtube_channels" not in st.session_state: |
st.session_state.youtube_channels = [""] |
def _setup_sidebar(self): |
with st.sidebar: |
st.header("YouTube Analysis Configuration") |
for i, channel in enumerate(st.session_state.youtube_channels): |
cols = st.columns([6, 1]) |
with cols[0]: |
url = st.text_input( |
"Channel URL", |
value=channel, |
key=f"channel_{i}", |
help="Example: https://www.youtube.com/@ChannelName" |
) |
with cols[1]: |
if i > 0 and st.button("β", key=f"remove_{i}"): |
st.session_state.youtube_channels.pop(i) |
st.rerun() |
st.button("Add Channel β", on_click=lambda: st.session_state.youtube_channels.append("")) |
st.divider() |
st.subheader("Analysis Period") |
self.start_date = st.date_input("Start Date", key="start_date") |
self.end_date = st.date_input("End Date", key="end_date") |
st.divider() |
if st.button("π Start Analysis", type="primary"): |
self._trigger_analysis() |
def _trigger_analysis(self): |
with st.spinner('Initializing deep content analysis...'): |
try: |
valid_urls = [ |
url for url in st.session_state.youtube_channels |
if self._is_valid_youtube_url(url) |
] |
if not valid_urls: |
st.error("Please provide at least one valid YouTube channel URL") |
return |
channel_data = asyncio.run( |
scrape_multiple_channels( |
valid_urls, |
start_date=self.start_date.strftime("%Y-%m-%d"), |
end_date=self.end_date.strftime("%Y-%m-%d") |
) |
) |
self._save_transcripts(channel_data) |
with st.spinner('Running AI-powered analysis...'): |
st.session_state.crew = self.analyzer.create_crew() |
st.session_state.response = st.session_state.crew.kickoff( |
inputs={"files": st.session_state.all_files} |
) |
except Exception as e: |
st.error(f"Analysis failed: {str(e)}") |
st.stop() |
def _save_transcripts(self, channel_data): |
st.session_state.all_files = [] |
os.makedirs("transcripts", exist_ok=True) |
with tqdm(total=sum(len(ch) for ch in channel_data), desc="Processing Videos") as pbar: |
for channel in channel_data: |
for video in channel: |
file_path = f"transcripts/{video['id']}.txt" |
with open(file_path, "w") as f: |
f.write("\n".join( |
[f"[{seg['start']}-{seg['end']}] {seg['text']}" |
for seg in video['transcript']] |
)) |
st.session_state.all_files.append(file_path) |
pbar.update(1) |
def _display_results(self): |
st.markdown("## Analysis Report") |
with st.expander("View Full Technical Analysis"): |
st.markdown(st.session_state.response) |
col1, col2 = st.columns([3, 1]) |
with col1: |
st.download_button( |
label="π₯ Download Full Report", |
data=st.session_state.response, |
file_name="youtube_analysis_report.md", |
mime="text/markdown" |
) |
with col2: |
if st.button("π New Analysis"): |
gc.collect() |
st.session_state.response = None |
st.rerun() |
@staticmethod |
def _is_valid_youtube_url(url: str) -> bool: |
return any(pattern in url for pattern in ["youtube.com/", "youtu.be/"]) |
def run(self): |
st.set_page_config(page_title="YouTube Intelligence System", layout="wide") |
st.title("YouTube Content Analysis Platform") |
st.markdown("---") |
self._setup_sidebar() |
if st.session_state.response: |
self._display_results() |
else: |
st.info("Configure analysis parameters in the sidebar to begin") |