sprint-econtai/analysis/preprocess.py
Félix Dorn 43076bcbb1 old
2025-07-15 00:41:05 +02:00

160 lines
7.4 KiB
Python

import logging
import pandas as pd
import numpy as np
from scipy.stats import median_abs_deviation
from .data import get_db_connection
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def _convert_to_minutes(level: float) -> float:
"""
Converts O*NET 'Frequency' scale values (levels) to estimated minutes per day.
This logic is derived from the `preprocessing_time_estimates` function
in the original analysis notebook.
"""
if pd.isna(level):
return 0
# This mapping is an interpretation of the O*NET frequency scale.
return {
1: 0, # Yearly or less
2: 2, # Several times a year
3: 10, # Several times a month
4: 30, # Several times a week
5: 120, # Daily
6: 240, # Several times a day
7: 480, # Hourly or more
}.get(int(level), 0)
def _mad_z_score(series: pd.Series) -> pd.Series:
"""
Calculates the robust Z-score using Median Absolute Deviation (MAD).
This function is derived from 'cell7' of the original analysis.
"""
if series.isnull().all():
return pd.Series([np.nan] * len(series), index=series.index)
median = series.median()
# scale='normal' makes MAD comparable to the standard deviation for a normal distribution.
mad = median_abs_deviation(series.dropna(), scale='normal')
if mad == 0:
return pd.Series([np.nan] * len(series), index=series.index)
return (series - median) / mad
def run_preprocessing() -> pd.DataFrame:
"""
Main orchestrator for the preprocessing pipeline.
This function faithfully reproduces the data transformation pipeline from the
original `analysis.py` script, including the `preprocessing_time_estimates`
and cell-specific data manipulations.
Returns:
pd.DataFrame: A fully preprocessed DataFrame ready for the generators.
"""
logging.info("Starting data preprocessing...")
conn = None
try:
conn = get_db_connection()
if conn is None:
raise ConnectionError("Could not establish database connection.")
# --- 1. Load Data from Database ---
# Fetch all necessary tables to build the initial DataFrame.
logging.info("Loading data from O*NET database...")
task_ratings_df = pd.read_sql_query("SELECT * FROM task_ratings", conn)
task_statements_df = pd.read_sql_query("SELECT * FROM task_statements", conn)
occupations_df = pd.read_sql_query("SELECT * FROM occupation_data", conn)
# --- 2. Initial Merge ---
# Merge the tables to create a comprehensive base DataFrame.
# Merging on both 'onetsoc_code' and 'task_id' is crucial to avoid
# creating duplicate columns from the overlapping 'onetsoc_code'.
logging.info("Merging base tables...")
tasks_df = pd.merge(task_ratings_df, task_statements_df, on=['onetsoc_code', 'task_id'])
tasks_df = pd.merge(tasks_df, occupations_df, on='onetsoc_code')
# --- 3. Create "Atomic Tasks" and Time Estimates (from `preprocessing_time_estimates`) ---
# This is the core of the analysis, focusing on tasks with frequency ratings.
logging.info("Filtering for 'atomic tasks' (scale_id='FR') and calculating time estimates...")
# Strip whitespace from scale_id to ensure the filter works correctly.
tasks_df['scale_id'] = tasks_df['scale_id'].str.strip()
atomic_tasks = tasks_df[tasks_df['scale_id'] == 'FR'].copy()
# Convert frequency confidence intervals into minutes/day
atomic_tasks['lb_estimate_in_minutes'] = atomic_tasks['lower_ci_bound'].apply(_convert_to_minutes)
atomic_tasks['ub_estimate_in_minutes'] = atomic_tasks['upper_ci_bound'].apply(_convert_to_minutes)
atomic_tasks['estimate_midpoint'] = (atomic_tasks['lb_estimate_in_minutes'] + atomic_tasks['ub_estimate_in_minutes']) / 2
# --- 4. Add Derived Columns for Analysis (from `cell` logic) ---
logging.info("Adding derived columns for analysis...")
# Add `onetsoc_major` for grouping by occupation category
atomic_tasks['onetsoc_major'] = atomic_tasks['onetsoc_code'].str[:2]
# Calculate estimate_range and estimate_ratio used in several plots
atomic_tasks['estimate_range'] = atomic_tasks['ub_estimate_in_minutes'] - atomic_tasks['lb_estimate_in_minutes']
# To calculate ratio, ensure lower bound is positive to avoid division by zero
lb_positive = atomic_tasks['lb_estimate_in_minutes'] > 0
atomic_tasks['estimate_ratio'] = np.nan
atomic_tasks.loc[lb_positive, 'estimate_ratio'] = atomic_tasks['ub_estimate_in_minutes'] / atomic_tasks['lb_estimate_in_minutes']
# --- 5. Calculate Outlier Scores (from `cell6` and `cell7`) ---
logging.info("Calculating standard and robust Z-scores for outlier detection...")
# Standard Z-score
grouped_stats = atomic_tasks.groupby('onetsoc_code')['estimate_midpoint'].agg(['mean', 'std'])
atomic_tasks = atomic_tasks.merge(grouped_stats, on='onetsoc_code', how='left')
# Calculate Z-score, avoiding division by zero if std is 0
non_zero_std = atomic_tasks['std'].notna() & (atomic_tasks['std'] != 0)
atomic_tasks['z_score'] = np.nan
atomic_tasks.loc[non_zero_std, 'z_score'] = \
(atomic_tasks.loc[non_zero_std, 'estimate_midpoint'] - atomic_tasks.loc[non_zero_std, 'mean']) / atomic_tasks.loc[non_zero_std, 'std']
# Robust Z-score (using MAD)
atomic_tasks['robust_z_score'] = atomic_tasks.groupby('onetsoc_code')['estimate_midpoint'].transform(_mad_z_score)
# --- 6. Prepare for other generators ---
# NOTE: The data for the 'task_breakdown_by_occupation' generator, specifically
# the 'remote_status' and 'estimateable' columns, is not available in the O*NET
# database. This data was likely loaded from a separate file (e.g., 'tasks_clean.parquet')
# in the original notebook. For now, we will add placeholder columns.
atomic_tasks['remote_status'] = 'unknown'
atomic_tasks['estimateable'] = 'unknown'
logging.info("Data preprocessing complete.")
return atomic_tasks
except Exception as e:
logging.error("An error occurred during preprocessing: %s", e, exc_info=True)
# Return an empty DataFrame on failure to prevent downstream errors
return pd.DataFrame()
finally:
if conn:
conn.close()
logging.info("Database connection closed.")
if __name__ == '__main__':
# This allows the preprocessing to be run directly for testing or debugging.
# Note: Requires data to be set up first by running data.py.
try:
processed_data = run_preprocessing()
if not processed_data.empty:
print("Preprocessing successful. DataFrame shape:", processed_data.shape)
print("Columns:", processed_data.columns.tolist())
print(processed_data.head())
# Save to a temporary file to inspect the output
output_path = "temp_preprocessed_data.csv"
processed_data.to_csv(output_path, index=False)
print(f"Sample output saved to {output_path}")
else:
print("Preprocessing failed or resulted in an empty DataFrame.")
except (FileNotFoundError, ConnectionError) as e:
logging.error("Failed to run preprocessing: %s", e)