import sqlite3 import pandas as pd import json import os from collections import defaultdict import numpy as np # --- Configuration --- DB_FILE = "onet.database" OUTPUT_FILE = "task_ratings_enriched.json" # Changed output filename # --- Database Interaction --- def fetch_data_from_db(db_path): """ Fetches required data from the O*NET SQLite database using JOINs, including DWAs. Args: db_path (str): Path to the SQLite database file. Returns: tuple(pandas.DataFrame, pandas.DataFrame): A tuple containing: - DataFrame with task ratings info. - DataFrame with task-to-DWA mapping. Returns (None, None) if the database file doesn't exist or an error occurs. """ if not os.path.exists(db_path): print(f"Error: Database file not found at {db_path}") return None, None try: conn = sqlite3.connect(db_path) # Construct the SQL query to join the tables and select necessary columns # Added LEFT JOINs for tasks_to_dwas and dwa_reference # Use LEFT JOIN in case a task has no DWAs query = """ SELECT tr.onetsoc_code, tr.task_id, ts.task, od.title AS occupation_title, od.description AS occupation_description, tr.scale_id, tr.category, tr.data_value, dr.dwa_title -- Added DWA title FROM task_ratings tr JOIN task_statements ts ON tr.task_id = ts.task_id JOIN occupation_data od ON tr.onetsoc_code = od.onetsoc_code LEFT JOIN tasks_to_dwas td ON tr.onetsoc_code = td.onetsoc_code AND tr.task_id = td.task_id -- LEFT JOIN dwa_reference dr ON td.dwa_id = dr.dwa_id; -- """ df = pd.read_sql_query(query, conn) conn.close() print( f"Successfully fetched {len(df)} records (including DWA info) from the database." ) if df.empty: print("Warning: Fetched DataFrame is empty.") # Return empty DataFrames with expected columns if the main fetch is empty ratings_cols = [ "onetsoc_code", "task_id", "task", "occupation_title", "occupation_description", "scale_id", "category", "data_value", ] dwa_cols = ["onetsoc_code", "task_id", "dwa_title"] return pd.DataFrame(columns=ratings_cols), pd.DataFrame(columns=dwa_cols) # Remove duplicates caused by joining ratings with potentially multiple DWAs per task # Keep only unique combinations of the core task/rating info before processing core_cols = [ "onetsoc_code", "task_id", "task", "occupation_title", "occupation_description", "scale_id", "category", "data_value", ] # Check if all core columns exist before attempting to drop duplicates missing_core_cols = [col for col in core_cols if col not in df.columns] if missing_core_cols: print(f"Error: Missing core columns in fetched data: {missing_core_cols}") return None, None ratings_df = df[core_cols].drop_duplicates().reset_index(drop=True) # Get unique DWA info separately dwa_cols = ["onetsoc_code", "task_id", "dwa_title"] # Check if all DWA columns exist before processing if all(col in df.columns for col in dwa_cols): dwas_df = ( df[dwa_cols] .dropna(subset=["dwa_title"]) .drop_duplicates() .reset_index(drop=True) ) else: print("Warning: DWA related columns missing, creating empty DWA DataFrame.") dwas_df = pd.DataFrame( columns=dwa_cols ) # Create empty df if columns missing return ratings_df, dwas_df # Return two dataframes now except sqlite3.Error as e: print(f"SQLite error: {e}") if "conn" in locals() and conn: conn.close() return None, None # Return None for both if error except Exception as e: print(f"An error occurred during data fetching: {e}") if "conn" in locals() and conn: conn.close() return None, None # Return None for both if error # --- Data Processing --- def process_task_ratings_with_dwas(ratings_df, dwas_df): """ Processes the fetched data to group, pivot frequency, calculate averages, structure the output, and add associated DWAs. Args: ratings_df (pandas.DataFrame): The input DataFrame with task ratings info. dwas_df (pandas.DataFrame): The input DataFrame with task-to-DWA mapping. Can be None or empty. Returns: list: A list of dictionaries, each representing an enriched task rating with DWAs. Returns None if the input ratings DataFrame is invalid. """ if ratings_df is None or not isinstance( ratings_df, pd.DataFrame ): # Check if it's a DataFrame print("Error: Input ratings DataFrame is invalid.") return None if ratings_df.empty: print( "Warning: Input ratings DataFrame is empty. Processing will yield empty result." ) # Decide how to handle empty input, maybe return empty list directly # return [] # Ensure dwas_df is a DataFrame, even if empty if dwas_df is None or not isinstance(dwas_df, pd.DataFrame): print("Warning: Invalid or missing DWA DataFrame. Proceeding without DWA data.") dwas_df = pd.DataFrame( columns=["onetsoc_code", "task_id", "dwa_title"] ) # Ensure it's an empty DF print("Starting data processing...") # --- 1. Handle Frequency (FT) --- freq_df = ratings_df[ratings_df["scale_id"] == "FT"].copy() if not freq_df.empty: freq_pivot = freq_df.pivot_table( index=["onetsoc_code", "task_id"], columns="category", values="data_value", fill_value=0, ) freq_pivot.columns = [ f"frequency_category_{int(col)}" for col in freq_pivot.columns ] print(f"Processed Frequency data. Shape: {freq_pivot.shape}") else: print("No Frequency (FT) data found.") # Create an empty DataFrame with the multi-index to allow merging later idx = pd.MultiIndex( levels=[[], []], codes=[[], []], names=["onetsoc_code", "task_id"] ) freq_pivot = pd.DataFrame(index=idx) # --- 2. Handle Importance (IM, IJ) --- imp_df = ratings_df[ratings_df["scale_id"].isin(["IM", "IJ"])].copy() if not imp_df.empty: imp_avg = ( imp_df.groupby(["onetsoc_code", "task_id"])["data_value"] .mean() .reset_index() ) imp_avg.rename(columns={"data_value": "importance_average"}, inplace=True) print(f"Processed Importance data. Shape: {imp_avg.shape}") else: print("No Importance (IM, IJ) data found.") imp_avg = pd.DataFrame( columns=["onetsoc_code", "task_id", "importance_average"] ) # --- 3. Handle Relevance (RT) --- rel_df = ratings_df[ratings_df["scale_id"] == "RT"].copy() if not rel_df.empty: rel_avg = ( rel_df.groupby(["onetsoc_code", "task_id"])["data_value"] .mean() .reset_index() ) rel_avg.rename(columns={"data_value": "relevance_average"}, inplace=True) print(f"Processed Relevance data. Shape: {rel_avg.shape}") else: print("No Relevance (RT) data found.") rel_avg = pd.DataFrame(columns=["onetsoc_code", "task_id", "relevance_average"]) # --- 4. Process DWAs --- if dwas_df is not None and not dwas_df.empty and "dwa_title" in dwas_df.columns: print("Processing DWA data...") # Group DWAs by task_id and aggregate titles into a list dwas_grouped = ( dwas_df.groupby(["onetsoc_code", "task_id"])["dwa_title"] .apply(list) .reset_index() ) # dwas_grouped.rename( columns={"dwa_title": "dwas"}, inplace=True ) # Rename column to 'dwas' print(f"Processed DWA data. Shape: {dwas_grouped.shape}") else: print("No valid DWA data found or provided for processing.") dwas_grouped = None # Set to None if no DWAs # --- 5. Get Base Task/Occupation Info --- base_cols = [ "onetsoc_code", "task_id", "task", "occupation_title", "occupation_description", ] # Check if base columns exist in ratings_df missing_base_cols = [col for col in base_cols if col not in ratings_df.columns] if missing_base_cols: print( f"Error: Missing base info columns in ratings_df: {missing_base_cols}. Cannot proceed." ) return None if not ratings_df.empty: base_info = ( ratings_df[base_cols] .drop_duplicates() .set_index(["onetsoc_code", "task_id"]) ) print(f"Extracted base info. Shape: {base_info.shape}") else: print("Cannot extract base info from empty ratings DataFrame.") # Create an empty df with index to avoid errors later if possible idx = pd.MultiIndex( levels=[[], []], codes=[[], []], names=["onetsoc_code", "task_id"] ) base_info = pd.DataFrame( index=idx, columns=[ col for col in base_cols if col not in ["onetsoc_code", "task_id"] ], ) # --- 6. Merge Processed Data --- print("Merging processed data...") # Start with base_info, which should have the index ['onetsoc_code', 'task_id'] final_df = base_info.merge( freq_pivot, left_index=True, right_index=True, how="left" ) # Reset index before merging non-indexed dfs final_df = final_df.reset_index() # Merge averages - check if they are not empty before merging if not imp_avg.empty: final_df = final_df.merge(imp_avg, on=["onetsoc_code", "task_id"], how="left") else: final_df["importance_average"] = np.nan # Add column if imp_avg was empty if not rel_avg.empty: final_df = final_df.merge(rel_avg, on=["onetsoc_code", "task_id"], how="left") else: final_df["relevance_average"] = np.nan # Add column if rel_avg was empty # Merge DWAs if available if dwas_grouped is not None and not dwas_grouped.empty: final_df = final_df.merge( dwas_grouped, on=["onetsoc_code", "task_id"], how="left" ) # Merge the dwas list # Fill NaN in 'dwas' column (for tasks with no DWAs) with empty lists # Check if 'dwas' column exists before applying function if "dwas" in final_df.columns: final_df["dwas"] = final_df["dwas"].apply( lambda x: x if isinstance(x, list) else [] ) # Ensure tasks without DWAs get [] else: print("Warning: 'dwas' column not created during merge.") final_df["dwas"] = [ [] for _ in range(len(final_df)) ] # Add empty list column else: # Add an empty 'dwas' column if no DWA data was processed or merged final_df["dwas"] = [[] for _ in range(len(final_df))] print(f"Final merged data shape: {final_df.shape}") # Convert DataFrame to list of dictionaries for JSON output # Handle potential NaN values during JSON conversion # Replace numpy NaN with Python None for JSON compatibility final_df = final_df.replace({np.nan: None}) result_list = final_df.to_dict(orient="records") return result_list # --- Output --- def write_to_json(data, output_path): """ Writes the processed data to a JSON file. Args: data (list): The list of dictionaries to write. output_path (str): Path to the output JSON file. """ if data is None: print("No data to write to JSON.") return if not isinstance(data, list): print( f"Error: Data to write is not a list (type: {type(data)}). Cannot write to JSON." ) return # Create directory if it doesn't exist output_dir = os.path.dirname(output_path) if output_dir and not os.path.exists(output_dir): try: os.makedirs(output_dir) print(f"Created output directory: {output_dir}") except OSError as e: print(f"Error creating output directory {output_dir}: {e}") return # Exit if cannot create directory try: with open(output_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) print(f"Successfully wrote enriched data to {output_path}") except IOError as e: print(f"Error writing JSON file to {output_path}: {e}") except TypeError as e: print(f"Error during JSON serialization: {e}. Check data types.") except Exception as e: print(f"An unexpected error occurred during JSON writing: {e}") # --- Main Execution --- if __name__ == "__main__": print("Starting O*NET Task Ratings & DWAs Enrichment Script...") # 1. Fetch data ratings_data_df, dwas_data_df = fetch_data_from_db(DB_FILE) # Fetch both datasets # 2. Process data # Proceed only if ratings_data_df is a valid DataFrame (even if empty) # dwas_data_df can be None or empty, handled inside process function if isinstance(ratings_data_df, pd.DataFrame): enriched_data = process_task_ratings_with_dwas( ratings_data_df, dwas_data_df ) # Pass both dataframes # 3. Write output if ( enriched_data is not None ): # Check if processing returned data (even an empty list is valid) write_to_json(enriched_data, OUTPUT_FILE) else: print("Data processing failed or returned None. No output file generated.") else: print( "Data fetching failed or returned invalid type for ratings data. Script terminated." ) print("Script finished.")