Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import json | |
import glob | |
from typing import List, Dict, Tuple, Optional | |
import tempfile | |
from PIL import Image, ImageDraw, ImageFont | |
import imageio | |
import streamlit.components.v1 as components | |
import base64 | |
st.set_page_config( | |
page_title="Computer Use Interaction Visualizer", | |
page_icon="🖥️", | |
layout="wide" | |
) | |
# Constants | |
DATA_ROOT = "website_data_v2" | |
ACTION_PAIRS_TO_SKIP = set(["CURSOR_POSITIONING", "KEYBOARD_INPUT_MOVE_START"]) | |
def scan_data_directory() -> Dict: | |
""" | |
Scans the data directory structure and returns a dictionary | |
representing the hierarchy of categories, websites, and sessions. | |
""" | |
data_structure = {} | |
# Walk through all categories | |
for category in os.listdir(DATA_ROOT): | |
category_path = os.path.join(DATA_ROOT, category) | |
if not os.path.isdir(category_path): | |
continue | |
data_structure[category] = {} | |
# Walk through all websites in this category | |
for website in os.listdir(category_path): | |
website_path = os.path.join(category_path, website) | |
if not os.path.isdir(website_path): | |
continue | |
data_structure[category][website] = [] | |
# Walk through all sessions for this website | |
for session in os.listdir(website_path): | |
session_path = os.path.join(website_path, session) | |
if not os.path.isdir(session_path): | |
continue | |
# Check if the session folder contains a nested folder with the same name | |
nested_session_path = os.path.join(session_path, session) | |
if os.path.isdir(nested_session_path): | |
# Check if the nested folder contains a screenshots directory | |
screenshots_path = os.path.join(nested_session_path, "screenshots") | |
if os.path.isdir(screenshots_path): | |
data_structure[category][website].append(session) | |
return data_structure | |
def load_session_data(category: str, website: str, session: str) -> Tuple[Dict, List[Dict], List[str]]: | |
""" | |
Loads all the data for a specific session including metadata, actions, and screenshots. | |
""" | |
# Paths to data files | |
session_path = os.path.join(DATA_ROOT, category, website, session) | |
metadata_path = os.path.join(session_path, "metadata.json") | |
nested_session_path = os.path.join(session_path, session) | |
action_log_path = os.path.join(nested_session_path, "action_log.json") | |
screenshots_path = os.path.join(nested_session_path, "screenshots") | |
# Load metadata | |
with open(metadata_path, 'r') as f: | |
metadata = json.load(f) | |
# Load action log | |
with open(action_log_path, 'r') as f: | |
action_data = json.load(f) | |
# Get sorted list of screenshots | |
screenshots = sorted(glob.glob(os.path.join(screenshots_path, "frame_*.png"))) | |
screenshots = [os.path.basename(s) for s in screenshots] | |
return metadata, action_data, screenshots | |
def group_actions_by_type(actions: List[Dict]) -> Dict[str, List[Dict]]: | |
""" | |
Groups actions by their type for easier analysis. | |
""" | |
grouped = {} | |
for action in actions: | |
action_type = action.get("type", "UNKNOWN") | |
if action_type not in grouped: | |
grouped[action_type] = [] | |
grouped[action_type].append(action) | |
return grouped | |
def find_before_after_pairs(actions: List[Dict]) -> List[Tuple[Optional[Dict], Optional[Dict]]]: | |
""" | |
Analyzes the action log to find before/after pairs of actions. | |
Returns a list of (before, after) action tuples. | |
""" | |
pairs = [] | |
i = 0 | |
while i < len(actions): | |
before_action = None | |
after_action = None | |
# Check if current action is a "START" or "BEFORE" action | |
if "_START" in actions[i]["type"] or "BEFORE" in actions[i]["description"]: | |
before_action = actions[i] | |
# Look for the corresponding "COMPLETE" or "AFTER" action | |
j = i + 1 | |
while j < len(actions) and j < i + 5: # Look ahead up to 5 actions | |
if ("_COMPLETE" in actions[j]["type"] or "AFTER" in actions[j]["description"]) and \ | |
actions[i]["type"].split("_START")[0] in actions[j]["type"]: | |
after_action = actions[j] | |
i = j # Skip ahead to after the COMPLETE action | |
break | |
j += 1 | |
# If we couldn't find a matching after action, just use this action alone | |
if not after_action and "_COMPLETE" not in actions[i]["type"]: | |
after_action = None | |
before_action = actions[i] | |
if before_action: | |
pairs.append((before_action, after_action)) | |
i += 1 | |
return pairs | |
def extract_frame_number(screenshot_name: str) -> int: | |
"""Extract the frame number from a screenshot filename.""" | |
try: | |
return int(screenshot_name.split('_')[1].split('.')[0]) | |
except (IndexError, ValueError): | |
return -1 | |
def draw_cursor(image, mouse_pos): | |
""" | |
Draw a mouse cursor on the image at the specified position. | |
Args: | |
image (PIL.Image): The image to draw on | |
mouse_pos (tuple): The (x, y) position of the mouse cursor | |
""" | |
if not mouse_pos: | |
return image | |
draw = ImageDraw.Draw(image) | |
mouse_x, mouse_y = mouse_pos | |
# Draw a blue circle at the mouse position (similar to generate_trajectory.py) | |
circle_radius = 15 | |
circle_outline = 3 | |
# Draw outer blue circle | |
draw.ellipse( | |
[(mouse_x - circle_radius, mouse_y - circle_radius), | |
(mouse_x + circle_radius, mouse_y + circle_radius)], | |
outline=(0, 0, 255, 255), | |
width=circle_outline | |
) | |
# Draw smaller inner circle for better visibility | |
draw.ellipse( | |
[(mouse_x - 5, mouse_y - 5), | |
(mouse_x + 5, mouse_y + 5)], | |
fill=(0, 0, 255, 128) # Semi-transparent blue | |
) | |
return image | |
def create_transition_gif(before_path, after_path, output_path, duration=10.0): | |
""" | |
Create a GIF animation transitioning between before and after screenshots. | |
Args: | |
before_path (str): Path to the before screenshot | |
after_path (str): Path to the after screenshot | |
output_path (str): Path to save the resulting GIF | |
duration (float): Duration of each frame in seconds | |
""" | |
try: | |
# Load the images | |
images = [] | |
before_img = Image.open(before_path) | |
after_img = Image.open(after_path) | |
# Ensure both images are the same size (resize if needed) | |
if before_img.size != after_img.size: | |
after_img = after_img.resize(before_img.size) | |
# Add text overlay to images | |
before_img_with_text = add_header_text(before_img.copy(), "BEFORE") | |
after_img_with_text = add_header_text(after_img.copy(), "AFTER") | |
# Add the before image 60 times | |
for _ in range(60): | |
images.append(before_img_with_text) | |
# Create a blank transition frame (copy of before image with reduced opacity) | |
transition_img = Image.new('RGBA', before_img.size, (0, 0, 0, 0)) | |
# Add the transition image 60 times | |
for _ in range(60): | |
images.append(transition_img) | |
# Add the after image 60 times | |
for _ in range(60): | |
images.append(after_img_with_text) | |
# Save as GIF with default duration | |
imageio.mimsave(output_path, images, loop=0, duration=duration) | |
return output_path | |
except Exception as e: | |
st.error(f"Error creating GIF: {e}") | |
return None | |
def add_header_text(image, text): | |
""" | |
Add large header text to the top of an image. | |
Args: | |
image (PIL.Image): The image to add text to | |
text (str): The text to add | |
Returns: | |
PIL.Image: Image with text overlay | |
""" | |
# Convert to RGBA if needed to support transparency | |
if image.mode != 'RGBA': | |
image = image.convert('RGBA') | |
# Create a drawing context | |
draw = ImageDraw.Draw(image) | |
# Find a suitable font size based on image width - increased multiplier for larger font | |
font_size = int(image.width * 0.5) | |
font = ImageFont.load_default() | |
# Get text dimensions using the selected font | |
try: | |
text_bbox = draw.textbbox((0, 0), text, font=font) | |
text_width = text_bbox[2] - text_bbox[0] | |
text_height = text_bbox[3] - text_bbox[1] | |
except AttributeError: # Handle cases where font might be the basic default font without bbox support | |
# Estimate text size for basic default font (less accurate) | |
text_width = len(text) * font_size * 0.6 # Rough estimation | |
text_height = font_size # Rough estimation | |
st.warning("Could not accurately determine text dimensions with the fallback font.") | |
# Position text at the top center of the image with some padding | |
# Ensure position calculation doesn't result in negative coordinates | |
x_pos = max(0, (image.width - text_width) // 2) | |
y_pos = 30 # Keep top padding consistent | |
position = (x_pos, y_pos) | |
# Define padding around the text for the background | |
padding_x = 30 | |
padding_y = 15 | |
# Draw semi-transparent background fitted around the text with padding | |
background_bbox = ( | |
max(0, position[0] - padding_x), # Ensure background doesn't go off-left | |
max(0, position[1] - padding_y), # Ensure background doesn't go off-top | |
min(image.width, position[0] + text_width + padding_x), # Ensure background doesn't go off-right | |
min(image.height, position[1] + text_height + padding_y) # Ensure background doesn't go off-bottom | |
) | |
# Only draw background if dimensions are valid | |
if background_bbox[2] > background_bbox[0] and background_bbox[3] > background_bbox[1]: | |
draw.rectangle(background_bbox, fill=(0, 0, 0, 180)) # Semi-transparent black background | |
# Draw text in white | |
draw.text(position, text, font=font, fill=(255, 255, 255, 255)) # White text | |
return image | |
def create_cursor_legend() -> Image.Image: | |
""" | |
Creates a small image explaining what the mouse cursor looks like in the visualizations. | |
""" | |
# Create a small transparent image for the legend | |
legend_img = Image.new('RGBA', (180, 80), (255, 255, 255, 220)) | |
draw = ImageDraw.Draw(legend_img) | |
# Draw the cursor at the center-left of the image | |
cursor_x, cursor_y = 40, 40 | |
# Draw outer blue circle | |
circle_radius = 15 | |
circle_outline = 3 | |
draw.ellipse( | |
[(cursor_x - circle_radius, cursor_y - circle_radius), | |
(cursor_x + circle_radius, cursor_y + circle_radius)], | |
outline=(0, 0, 255, 255), | |
width=circle_outline | |
) | |
# Draw smaller inner circle for better visibility | |
draw.ellipse( | |
[(cursor_x - 5, cursor_y - 5), | |
(cursor_x + 5, cursor_y + 5)], | |
fill=(0, 0, 255, 128) # Semi-transparent blue | |
) | |
# Add text explanation | |
font = ImageFont.load_default() | |
draw.text((cursor_x + 30, cursor_y - 15), "Mouse Cursor", font=font, fill=(0, 0, 0, 255)) | |
draw.text((cursor_x + 30, cursor_y + 5), "Position", font=font, fill=(0, 0, 0, 255)) | |
# Draw border around legend | |
draw.rectangle([(0, 0), (179, 79)], outline=(200, 200, 200, 255), width=1) | |
return legend_img | |
def main(): | |
st.title("Computer Use Interaction Visualizer") | |
# Scan data directory | |
data_structure = scan_data_directory() | |
if not data_structure: | |
st.error(f"No data found in {DATA_ROOT}. Please check the directory path.") | |
return | |
# Sidebar for navigation | |
st.sidebar.title("Navigation") | |
# Reset navigation function | |
def reset_navigation(): | |
st.session_state.current_pair_index = 0 | |
# Select category | |
category_options = list(data_structure.keys()) | |
category = st.sidebar.selectbox("Select Category", category_options, on_change=reset_navigation) | |
if not category: | |
st.info("Please select a category.") | |
return | |
# Select website | |
website_options = list(data_structure[category].keys()) | |
website = st.sidebar.selectbox("Select Website", website_options, on_change=reset_navigation) | |
if not website: | |
st.info("Please select a website.") | |
return | |
# Select session | |
session_options = data_structure[category][website] | |
session = st.sidebar.selectbox("Select Session", session_options, on_change=reset_navigation) | |
if not session: | |
st.info("Please select a session.") | |
return | |
# Add cursor legend to sidebar | |
st.sidebar.markdown("---") | |
st.sidebar.markdown("### Legend") | |
legend_img = create_cursor_legend() | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: | |
legend_path = tmp_file.name | |
legend_img.save(legend_path) | |
st.sidebar.image(legend_path, use_container_width=True) | |
os.unlink(legend_path) # Clean up temp file | |
# Load data for selected session | |
metadata, action_data, screenshots = load_session_data(category, website, session) | |
# Display metadata | |
with st.expander("Session Metadata", expanded=False): | |
st.json(metadata) | |
# Group actions by type | |
actions = action_data.get("actions", []) | |
grouped_actions = group_actions_by_type(actions) | |
# Find before/after pairs | |
action_pairs = find_before_after_pairs(actions) | |
SKIP_ACTIONS = ["CURSOR_POSITIONING", "KEYBOARD_INPUT_MOVE_START"] | |
# Filter out cursor positioning actions | |
filtered_action_pairs = [] | |
for pair in action_pairs: | |
before_action, after_action = pair | |
# Skip pairs where either action type is in SKIP_ACTIONS | |
should_skip = False | |
if before_action and "type" in before_action: | |
if before_action["type"] in SKIP_ACTIONS: | |
should_skip = True | |
if after_action and "type" in after_action: | |
if after_action["type"] in SKIP_ACTIONS: | |
should_skip = True | |
if not should_skip: | |
filtered_action_pairs.append(pair) | |
# Replace original pairs with filtered ones | |
action_pairs = filtered_action_pairs | |
# Hardcode visualization settings to always be enabled | |
show_gif = True | |
show_cursor = True | |
# Display session statistics at the top | |
with st.expander("Session Statistics", expanded=False): | |
st.markdown(f"**Total Actions:** {len(actions)}") | |
st.markdown(f"**Total Screenshots:** {len(screenshots)}") | |
st.markdown(f"**Action Pairs:** {len(action_pairs)}") | |
st.markdown(f"**Session Duration:** {action_data.get('duration', 'N/A')} seconds") | |
# Count of action types | |
st.subheader("Action Types") | |
action_types = {} | |
for action in actions: | |
action_type = action.get("type", "UNKNOWN") | |
if action_type not in action_types: | |
action_types[action_type] = 0 | |
action_types[action_type] += 1 | |
# Convert to list of tuples and sort by count | |
action_type_counts = [(k, v) for k, v in action_types.items()] | |
action_type_counts.sort(key=lambda x: x[1], reverse=True) | |
for action_type, count in action_type_counts: | |
st.text(f"{action_type}: {count}") | |
# Slideshow controls | |
st.subheader("Interaction Slideshow") | |
# Create a session state for the current pair index if it doesn't exist | |
if 'current_pair_index' not in st.session_state: | |
st.session_state.current_pair_index = 0 | |
# Navigation controls | |
col1, col2, col3 = st.columns([1, 3, 1]) | |
with col1: | |
if st.button("Previous", key="prev_button"): | |
st.session_state.current_pair_index = max(0, st.session_state.current_pair_index - 1) | |
with col2: | |
pair_slider = st.slider( | |
"Navigate Actions", | |
min_value=0, | |
max_value=len(action_pairs) - 1 if action_pairs else 0, | |
value=st.session_state.current_pair_index, | |
key="pair_slider" | |
) | |
st.session_state.current_pair_index = pair_slider | |
with col3: | |
if st.button("Next", key="next_button"): | |
st.session_state.current_pair_index = min(len(action_pairs) - 1 if action_pairs else 0, st.session_state.current_pair_index + 1) | |
# Display the current pair | |
if action_pairs: | |
current_pair = action_pairs[st.session_state.current_pair_index] | |
before_action, after_action = current_pair | |
# Display action details at the top | |
with st.expander("Action Details", expanded=False): | |
col1, col2 = st.columns(2) | |
with col1: | |
if before_action: | |
st.subheader("Before Action Details") | |
st.json(before_action) | |
with col2: | |
if after_action: | |
st.subheader("After Action Details") | |
st.json(after_action) | |
# Display action descriptions | |
col1, col2 = st.columns(2) | |
# If showing GIF and we have both before and after images | |
if show_gif and before_action and after_action and before_action.get("screenshot") and after_action.get("screenshot"): | |
# Create a GIF and display it in a single column | |
st.markdown(f"### Action: {after_action.get('description', 'N/A').replace('AFTER: ', '')}") | |
before_screenshot = before_action.get("screenshot") | |
after_screenshot = after_action.get("screenshot") | |
if before_screenshot in screenshots and after_screenshot in screenshots: | |
before_path = os.path.join(DATA_ROOT, category, website, session, session, "screenshots", before_screenshot) | |
after_path = os.path.join(DATA_ROOT, category, website, session, session, "screenshots", after_screenshot) | |
if os.path.exists(before_path) and os.path.exists(after_path): | |
# Create temporary directory for GIF | |
with tempfile.NamedTemporaryFile(suffix='.gif', delete=False) as tmp_file: | |
gif_path = tmp_file.name | |
# Extract mouse positions if available | |
before_mouse_pos = None | |
after_mouse_pos = None | |
if show_cursor: | |
if "mouse_position" in before_action and "screenshot" in before_action["mouse_position"]: | |
before_mouse_pos = before_action["mouse_position"]["screenshot"] | |
elif "mouse_position" in before_action and "screen" in before_action["mouse_position"]: | |
# Fallback to screen coords if screenshot coords not available | |
before_mouse_pos = before_action["mouse_position"]["screen"] | |
if "mouse_position" in after_action and "screenshot" in after_action["mouse_position"]: | |
after_mouse_pos = after_action["mouse_position"]["screenshot"] | |
elif "mouse_position" in after_action and "screen" in after_action["mouse_position"]: | |
# Fallback to screen coords if screenshot coords not available | |
after_mouse_pos = after_action["mouse_position"]["screen"] | |
# Add cursor to images if needed | |
if show_cursor: | |
before_img = Image.open(before_path) | |
after_img = Image.open(after_path) | |
if before_mouse_pos: | |
before_img = draw_cursor(before_img, before_mouse_pos) | |
before_img.save(before_path + ".cursor.png") | |
before_path = before_path + ".cursor.png" | |
if after_mouse_pos: | |
after_img = draw_cursor(after_img, after_mouse_pos) | |
after_img.save(after_path + ".cursor.png") | |
after_path = after_path + ".cursor.png" | |
# Create and display the GIF | |
gif_path = create_transition_gif(before_path, after_path, gif_path) | |
if gif_path: | |
# Get image height for proper display | |
img_height = min(600, before_img.height if 'before_img' in locals() else 500) | |
# Use components.html for better GIF rendering | |
with open(gif_path, "rb") as file: | |
gif_bytes = file.read() | |
gif_b64 = base64.b64encode(gif_bytes).decode("utf-8") | |
html_code = f""" | |
<div style="display: flex; justify-content: center; margin-top: 20px;"> | |
<img src="data:image/gif;base64,{gif_b64}" style="max-width: 100%; height: auto;"> | |
</div> | |
""" | |
# Use the HTML component with specified height | |
components.html(html_code, height=img_height+50) | |
# Clean up temporary cursor images if created | |
if show_cursor: | |
if os.path.exists(before_path) and before_path.endswith(".cursor.png"): | |
os.remove(before_path) | |
if os.path.exists(after_path) and after_path.endswith(".cursor.png"): | |
os.remove(after_path) | |
else: | |
# Fallback to separate images if GIF creation fails | |
st.warning("Could not create animation, showing separate images instead") | |
col1, col2 = st.columns(2) | |
with col1: | |
st.image(before_path, use_container_width=True) | |
with col2: | |
st.image(after_path, use_container_width=True) | |
else: | |
st.warning(f"One or both screenshots not found: {before_path}, {after_path}") | |
else: | |
st.info("One or both screenshots are missing for this action pair.") | |
else: | |
# Show before and after separately | |
with col1: | |
if before_action: | |
st.markdown(f"### Before Action") | |
st.text(f"Action Type: {before_action.get('type', 'N/A')}") | |
# Get the screenshot for the before action | |
before_screenshot = before_action.get("screenshot") | |
if before_screenshot and before_screenshot in screenshots: | |
screenshot_path = os.path.join(DATA_ROOT, category, website, session, session, "screenshots", before_screenshot) | |
if os.path.exists(screenshot_path): | |
if show_cursor and "mouse_position" in before_action: | |
# Prefer screenshot coordinates but fall back to screen coordinates | |
if "screenshot" in before_action["mouse_position"]: | |
mouse_pos = before_action["mouse_position"]["screenshot"] | |
elif "screen" in before_action["mouse_position"]: | |
mouse_pos = before_action["mouse_position"]["screen"] | |
img = Image.open(screenshot_path) | |
img = draw_cursor(img, mouse_pos) | |
# Save to a temporary file | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: | |
cursor_path = tmp_file.name | |
img.save(cursor_path) | |
# Display the image with cursor | |
st.image(cursor_path, use_container_width=True) | |
# Clean up | |
os.unlink(cursor_path) | |
else: | |
st.image(screenshot_path, use_container_width=True) | |
else: | |
st.warning(f"Screenshot not found: {screenshot_path}") | |
else: | |
st.info("No screenshot available for this action.") | |
with col2: | |
if after_action: | |
st.markdown(f"### Action: {after_action.get('description', 'N/A').replace('AFTER: ', '')}") | |
st.text(f"Action Type: {after_action.get('type', 'N/A')}") | |
# Get the screenshot for the after action | |
after_screenshot = after_action.get("screenshot") | |
if after_screenshot and after_screenshot in screenshots: | |
screenshot_path = os.path.join(DATA_ROOT, category, website, session, session, "screenshots", after_screenshot) | |
if os.path.exists(screenshot_path): | |
if show_cursor and "mouse_position" in after_action: | |
# Prefer screenshot coordinates but fall back to screen coordinates | |
if "screenshot" in after_action["mouse_position"]: | |
mouse_pos = after_action["mouse_position"]["screenshot"] | |
elif "screen" in after_action["mouse_position"]: | |
mouse_pos = after_action["mouse_position"]["screen"] | |
img = Image.open(screenshot_path) | |
img = draw_cursor(img, mouse_pos) | |
# Save to a temporary file | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: | |
cursor_path = tmp_file.name | |
img.save(cursor_path) | |
# Display the image with cursor | |
st.image(cursor_path, use_container_width=True) | |
# Clean up | |
os.unlink(cursor_path) | |
else: | |
st.image(screenshot_path, use_container_width=True) | |
else: | |
st.warning(f"Screenshot not found: {screenshot_path}") | |
else: | |
st.info("No screenshot available for this action.") | |
else: | |
st.info("No 'after' action available for this pair.") | |
else: | |
st.warning("No action pairs found for this session.") | |
if __name__ == "__main__": | |
main() |