import sqlite3 import pandas as pd import json import os from collections import defaultdict import numpy as np # Import numpy for nan handling if necessary # --- Configuration --- DB_FILE = "onet.database" OUTPUT_FILE = "task_ratings_enriched.json" # --- Database Interaction --- def fetch_data_from_db(db_path): """ Fetches required data from the O*NET SQLite database using JOINs. Args: db_path (str): Path to the SQLite database file. Returns: pandas.DataFrame: DataFrame containing joined data from task_ratings, task_statements, and occupation_data. Returns None if the database file doesn't exist or an error occurs. """ if not os.path.exists(db_path): print(f"Error: Database file not found at {db_path}") return None try: conn = sqlite3.connect(db_path) # Construct the SQL query to join the tables and select necessary columns # We select all relevant columns needed for processing. query = """ SELECT tr.onetsoc_code, tr.task_id, ts.task, od.title AS occupation_title, od.description AS occupation_description, tr.scale_id, tr.category, tr.data_value FROM task_ratings tr JOIN task_statements ts ON tr.task_id = ts.task_id JOIN occupation_data od ON tr.onetsoc_code = od.onetsoc_code; """ df = pd.read_sql_query(query, conn) conn.close() print(f"Successfully fetched {len(df)} records from the database.") return df except sqlite3.Error as e: print(f"SQLite error: {e}") if conn: conn.close() return None except Exception as e: print(f"An error occurred during data fetching: {e}") if "conn" in locals() and conn: conn.close() return None # --- Data Processing --- def process_task_ratings(df): """ Processes the fetched data to group, pivot frequency, calculate averages, and structure the output. Args: df (pandas.DataFrame): The input DataFrame with joined data. Returns: list: A list of dictionaries, each representing an enriched task rating. Returns None if the input DataFrame is invalid. """ if df is None or df.empty: print("Error: Input DataFrame is empty or invalid.") return None print("Starting data processing...") # --- 1. Handle Frequency (FT) --- # Filter for Frequency ratings freq_df = df[df["scale_id"] == "FT"].copy() # Pivot the frequency data: index by task and occupation, columns by category # We fill missing frequency values with 0, assuming no rating means 0% for that category. freq_pivot = freq_df.pivot_table( index=["onetsoc_code", "task_id"], columns="category", values="data_value", fill_value=0, # Fill missing categories for a task/occupation with 0 ) # Rename columns for clarity using the requested format freq_pivot.columns = [ f"frequency_category_{int(col)}" for col in freq_pivot.columns ] # <-- UPDATED LINE print(f"Processed Frequency data. Shape: {freq_pivot.shape}") # --- 2. Handle Importance (IM, IJ) --- # Filter for Importance ratings imp_df = df[df["scale_id"].isin(["IM", "IJ"])].copy() # Group by task and occupation, calculate the mean importance # Using np.nanmean to handle potential NaN values gracefully if any exist imp_avg = ( imp_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index() ) imp_avg.rename(columns={"data_value": "importance_average"}, inplace=True) print(f"Processed Importance data. Shape: {imp_avg.shape}") # --- 3. Handle Relevance (RT) --- # Filter for Relevance ratings rel_df = df[df["scale_id"] == "RT"].copy() # Group by task and occupation, calculate the mean relevance rel_avg = ( rel_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index() ) rel_avg.rename(columns={"data_value": "relevance_average"}, inplace=True) print(f"Processed Relevance data. Shape: {rel_avg.shape}") # --- 4. Get Base Task/Occupation Info --- # Select unique combinations of task and occupation details base_info = ( df[ [ "onetsoc_code", "task_id", "task", "occupation_title", "occupation_description", ] ] .drop_duplicates() .set_index(["onetsoc_code", "task_id"]) ) print(f"Extracted base info. Shape: {base_info.shape}") # --- 5. Merge Processed Data --- # Start with the base info and merge the calculated/pivoted data # Use 'left' joins to ensure all tasks/occupations from the base_info are kept. # If a task/occupation doesn't have frequency, importance, or relevance ratings, # the corresponding columns will have NaN values after the merge. print("Merging processed data...") final_df = base_info.merge( freq_pivot, left_index=True, right_index=True, how="left" ) # Set index before merging averages which are not multi-indexed final_df = final_df.reset_index() final_df = final_df.merge(imp_avg, on=["onetsoc_code", "task_id"], how="left") final_df = final_df.merge(rel_avg, on=["onetsoc_code", "task_id"], how="left") # Fill potential NaN values resulting from left joins if needed. # For averages, NaN might mean no rating was provided. We can leave them as NaN # or fill with 0 or another placeholder depending on desired interpretation. # For frequency categories, NaN could mean that category wasn't rated. We filled with 0 during pivot. # Example: Fill NaN averages with 0 # final_df['importance_average'].fillna(0, inplace=True) # final_df['relevance_average'].fillna(0, inplace=True) # Note: Leaving NaNs might be more informative. print(f"Final merged data shape: {final_df.shape}") # Convert DataFrame to list of dictionaries for JSON output # Handle potential NaN values during JSON conversion final_df = final_df.replace( {np.nan: None} ) # Replace numpy NaN with Python None for JSON compatibility result_list = final_df.to_dict(orient="records") return result_list # --- Output --- def write_to_json(data, output_path): """ Writes the processed data to a JSON file. Args: data (list): The list of dictionaries to write. output_path (str): Path to the output JSON file. """ if data is None: print("No data to write to JSON.") return try: with open(output_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) print(f"Successfully wrote enriched data to {output_path}") except IOError as e: print(f"Error writing JSON file: {e}") except Exception as e: print(f"An unexpected error occurred during JSON writing: {e}") # --- Main Execution --- if __name__ == "__main__": print("Starting O*NET Task Ratings Enrichment Script...") # 1. Fetch data raw_data_df = fetch_data_from_db(DB_FILE) # 2. Process data if raw_data_df is not None: enriched_data = process_task_ratings(raw_data_df) # 3. Write output if enriched_data: write_to_json(enriched_data, OUTPUT_FILE) else: print("Data processing failed. No output file generated.") else: print("Data fetching failed. Script terminated.") print("Script finished.")