import logging import pandas as pd import numpy as np from scipy.stats import median_abs_deviation from .data import get_db_connection # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def _convert_to_minutes(level: float) -> float: """ Converts O*NET 'Frequency' scale values (levels) to estimated minutes per day. This logic is derived from the `preprocessing_time_estimates` function in the original analysis notebook. """ if pd.isna(level): return 0 # This mapping is an interpretation of the O*NET frequency scale. return { 1: 0, # Yearly or less 2: 2, # Several times a year 3: 10, # Several times a month 4: 30, # Several times a week 5: 120, # Daily 6: 240, # Several times a day 7: 480, # Hourly or more }.get(int(level), 0) def _mad_z_score(series: pd.Series) -> pd.Series: """ Calculates the robust Z-score using Median Absolute Deviation (MAD). This function is derived from 'cell7' of the original analysis. """ if series.isnull().all(): return pd.Series([np.nan] * len(series), index=series.index) median = series.median() # scale='normal' makes MAD comparable to the standard deviation for a normal distribution. mad = median_abs_deviation(series.dropna(), scale='normal') if mad == 0: return pd.Series([np.nan] * len(series), index=series.index) return (series - median) / mad def run_preprocessing() -> pd.DataFrame: """ Main orchestrator for the preprocessing pipeline. This function faithfully reproduces the data transformation pipeline from the original `analysis.py` script, including the `preprocessing_time_estimates` and cell-specific data manipulations. Returns: pd.DataFrame: A fully preprocessed DataFrame ready for the generators. """ logging.info("Starting data preprocessing...") conn = None try: conn = get_db_connection() if conn is None: raise ConnectionError("Could not establish database connection.") # --- 1. Load Data from Database --- # Fetch all necessary tables to build the initial DataFrame. logging.info("Loading data from O*NET database...") task_ratings_df = pd.read_sql_query("SELECT * FROM task_ratings", conn) task_statements_df = pd.read_sql_query("SELECT * FROM task_statements", conn) occupations_df = pd.read_sql_query("SELECT * FROM occupation_data", conn) # --- 2. Initial Merge --- # Merge the tables to create a comprehensive base DataFrame. # Merging on both 'onetsoc_code' and 'task_id' is crucial to avoid # creating duplicate columns from the overlapping 'onetsoc_code'. logging.info("Merging base tables...") tasks_df = pd.merge(task_ratings_df, task_statements_df, on=['onetsoc_code', 'task_id']) tasks_df = pd.merge(tasks_df, occupations_df, on='onetsoc_code') # --- 3. Create "Atomic Tasks" and Time Estimates (from `preprocessing_time_estimates`) --- # This is the core of the analysis, focusing on tasks with frequency ratings. logging.info("Filtering for 'atomic tasks' (scale_id='FR') and calculating time estimates...") # Strip whitespace from scale_id to ensure the filter works correctly. tasks_df['scale_id'] = tasks_df['scale_id'].str.strip() atomic_tasks = tasks_df[tasks_df['scale_id'] == 'FR'].copy() # Convert frequency confidence intervals into minutes/day atomic_tasks['lb_estimate_in_minutes'] = atomic_tasks['lower_ci_bound'].apply(_convert_to_minutes) atomic_tasks['ub_estimate_in_minutes'] = atomic_tasks['upper_ci_bound'].apply(_convert_to_minutes) atomic_tasks['estimate_midpoint'] = (atomic_tasks['lb_estimate_in_minutes'] + atomic_tasks['ub_estimate_in_minutes']) / 2 # --- 4. Add Derived Columns for Analysis (from `cell` logic) --- logging.info("Adding derived columns for analysis...") # Add `onetsoc_major` for grouping by occupation category atomic_tasks['onetsoc_major'] = atomic_tasks['onetsoc_code'].str[:2] # Calculate estimate_range and estimate_ratio used in several plots atomic_tasks['estimate_range'] = atomic_tasks['ub_estimate_in_minutes'] - atomic_tasks['lb_estimate_in_minutes'] # To calculate ratio, ensure lower bound is positive to avoid division by zero lb_positive = atomic_tasks['lb_estimate_in_minutes'] > 0 atomic_tasks['estimate_ratio'] = np.nan atomic_tasks.loc[lb_positive, 'estimate_ratio'] = atomic_tasks['ub_estimate_in_minutes'] / atomic_tasks['lb_estimate_in_minutes'] # --- 5. Calculate Outlier Scores (from `cell6` and `cell7`) --- logging.info("Calculating standard and robust Z-scores for outlier detection...") # Standard Z-score grouped_stats = atomic_tasks.groupby('onetsoc_code')['estimate_midpoint'].agg(['mean', 'std']) atomic_tasks = atomic_tasks.merge(grouped_stats, on='onetsoc_code', how='left') # Calculate Z-score, avoiding division by zero if std is 0 non_zero_std = atomic_tasks['std'].notna() & (atomic_tasks['std'] != 0) atomic_tasks['z_score'] = np.nan atomic_tasks.loc[non_zero_std, 'z_score'] = \ (atomic_tasks.loc[non_zero_std, 'estimate_midpoint'] - atomic_tasks.loc[non_zero_std, 'mean']) / atomic_tasks.loc[non_zero_std, 'std'] # Robust Z-score (using MAD) atomic_tasks['robust_z_score'] = atomic_tasks.groupby('onetsoc_code')['estimate_midpoint'].transform(_mad_z_score) # --- 6. Prepare for other generators --- # NOTE: The data for the 'task_breakdown_by_occupation' generator, specifically # the 'remote_status' and 'estimateable' columns, is not available in the O*NET # database. This data was likely loaded from a separate file (e.g., 'tasks_clean.parquet') # in the original notebook. For now, we will add placeholder columns. atomic_tasks['remote_status'] = 'unknown' atomic_tasks['estimateable'] = 'unknown' logging.info("Data preprocessing complete.") return atomic_tasks except Exception as e: logging.error("An error occurred during preprocessing: %s", e, exc_info=True) # Return an empty DataFrame on failure to prevent downstream errors return pd.DataFrame() finally: if conn: conn.close() logging.info("Database connection closed.") if __name__ == '__main__': # This allows the preprocessing to be run directly for testing or debugging. # Note: Requires data to be set up first by running data.py. try: processed_data = run_preprocessing() if not processed_data.empty: print("Preprocessing successful. DataFrame shape:", processed_data.shape) print("Columns:", processed_data.columns.tolist()) print(processed_data.head()) # Save to a temporary file to inspect the output output_path = "temp_preprocessed_data.csv" processed_data.to_csv(output_path, index=False) print(f"Sample output saved to {output_path}") else: print("Preprocessing failed or resulted in an empty DataFrame.") except (FileNotFoundError, ConnectionError) as e: logging.error("Failed to run preprocessing: %s", e)