import duckdb import yaml import time import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Create a DuckDB connection con = duckdb.connect() def execute_with_timing(query, description): """Execute a DuckDB query and log the execution time.""" start_time = time.perf_counter() # Use perf_counter for higher resolution timing con.execute(query) end_time = time.perf_counter() # End timing after the query execution logging.info(f"Completed {description} in {end_time - start_time:.6f} seconds.") # Start timing the total execution total_start_time = time.perf_counter() # Load parents.parquet into an in-memory table load_parents_query = """ CREATE TABLE parents_in_memory AS SELECT * FROM parquet_scan('public/parents.parquet') """ execute_with_timing(load_parents_query, "Loaded parents.parquet into RAM") # Step 1: Assign a unique numerical ID to each model ID unique_id_query = """ CREATE TABLE unique_ids AS SELECT id, ROW_NUMBER() OVER () AS tmp_id FROM parents_in_memory """ execute_with_timing(unique_id_query, "Step 1: Created unique_ids table") # Step 2: Unnest base_models and create a temporary table unnest_query = """ CREATE TABLE unnested_models AS SELECT u.tmp_id AS child_tmp_id, -- Numerical ID for the child model UNNEST(p.base_models) AS base_model FROM parents_in_memory p JOIN unique_ids u ON p.id = u.id WHERE p.base_models IS NOT NULL -- Filter out models without base models """ execute_with_timing(unnest_query, "Step 2: Created unnested_models table") # Step 3: Create a temporary table for direct parent mapping using numerical IDs parent_level_query = """ CREATE TABLE parent_level AS SELECT u.child_tmp_id, -- Numerical ID for the child model b.tmp_id AS base_tmp_id -- Numerical ID for the base model (parent) FROM unnested_models u JOIN unique_ids b ON u.base_model = b.id """ execute_with_timing(parent_level_query, "Step 3: Created parent_level table") # Step 4: Recursive CTE to find all ancestor-children mappings using numerical IDs ancestor_children_query = """ CREATE TABLE ancestor_children AS WITH RECURSIVE ancestor_children_cte AS ( SELECT base_tmp_id AS ancestor_tmp_id, -- Start with direct parent as ancestor child_tmp_id AS child_tmp_id, -- Direct child 1 AS depth -- Initialize depth counter FROM parent_level UNION ALL SELECT ac.ancestor_tmp_id, -- Propagate ancestor pl.child_tmp_id, -- Find new child in the chain ac.depth + 1 -- Increment depth counter FROM parent_level pl JOIN ancestor_children_cte ac ON pl.base_tmp_id = ac.child_tmp_id WHERE ac.depth < 20 -- Limit recursion to 10 levels ) SELECT a.id AS ancestor, LIST(DISTINCT c.id) AS all_children FROM ancestor_children_cte ac JOIN unique_ids a ON ac.ancestor_tmp_id = a.tmp_id JOIN unique_ids c ON c.tmp_id = ac.child_tmp_id GROUP BY ancestor """ execute_with_timing(ancestor_children_query, "Step 4: Created ancestor_children table with string IDs") # Create a direct children mapping table direct_children_mapping_query = """ CREATE TABLE direct_children_mapping AS SELECT p.id AS parent, LIST(DISTINCT u.id) AS direct_children FROM parents_in_memory p LEFT JOIN unnested_models um ON p.id = um.base_model LEFT JOIN unique_ids u ON um.child_tmp_id = u.tmp_id GROUP BY p.id """ execute_with_timing(direct_children_mapping_query, "Created direct_children_mapping table") # Write the final result to a parquet file, using direct_children_mapping for direct_children start_time = time.perf_counter() final_output_query = """ COPY ( SELECT ac.ancestor as ancestor, dcm.direct_children as direct_children, ac.all_children as all_children, CAST(array_length(ac.all_children) AS INTEGER) as all_children_count, CAST(array_length(dcm.direct_children) AS INTEGER) as direct_children_count FROM ancestor_children ac LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent ORDER BY all_children_count DESC ) TO 'public/ancestor_children.parquet' (FORMAT 'parquet') """ con.execute(final_output_query) end_time = time.perf_counter() logging.info(f"Written ancestor_children to parquet file in {end_time - start_time:.6f} seconds.") # Write a random sample of 10 rows with non-empty children to yaml file for inspection start_time = time.perf_counter() sample_query = """ SELECT ac.ancestor, dcm.direct_children, ac.all_children FROM ancestor_children ac LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent WHERE array_length(ac.all_children) > 0 LIMIT 10 """ sample_data = con.execute(sample_query).fetchall() with open("public/ancestor_children.example.yaml", "w") as f: yaml.safe_dump(sample_data, f, default_flow_style=False) end_time = time.perf_counter() logging.info(f"Written sample data to YAML file in {end_time - start_time:.6f} seconds.") # Write a random sample of 10 rows with no children (direct or indirect) to yaml file start_time = time.perf_counter() no_children_query = """ SELECT ac.ancestor, dcm.direct_children, ac.all_children FROM ancestor_children ac LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent WHERE array_length(ac.all_children) = 0 LIMIT 10 """ no_children_data = con.execute(no_children_query).fetchall() end_time = time.perf_counter() logging.info(f"Fetched sample data of models with no children in {end_time - start_time:.6f} seconds.") logging.info("Examples of models with no children (direct or indirect):") for model in no_children_data: logging.info(model) # List top 10 ancestors with the most children and their number of direct children start_time = time.perf_counter() top_ancestors_query = """ SELECT ac.ancestor, array_length(ac.all_children) AS num_all_children, array_length(dcm.direct_children) AS num_direct_children FROM ancestor_children ac LEFT JOIN direct_children_mapping dcm ON ac.ancestor = dcm.parent ORDER BY num_all_children DESC LIMIT 10 """ top_ancestors = con.execute(top_ancestors_query).fetchall() end_time = time.perf_counter() logging.info(f"Listed top 10 ancestors with the most children in {end_time - start_time:.6f} seconds.") logging.info("Top 10 ancestors with the most children and their number of direct children:") for ancestor in top_ancestors: logging.info(ancestor) # Log the total processing time total_execution_time = time.perf_counter() - total_start_time print(f"Total processing time: {total_execution_time:.6f} seconds")