160 lines
7.4 KiB
Python
160 lines
7.4 KiB
Python
import logging
|
|
import pandas as pd
|
|
import numpy as np
|
|
from scipy.stats import median_abs_deviation
|
|
from .data import get_db_connection
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
def _convert_to_minutes(level: float) -> float:
|
|
"""
|
|
Converts O*NET 'Frequency' scale values (levels) to estimated minutes per day.
|
|
This logic is derived from the `preprocessing_time_estimates` function
|
|
in the original analysis notebook.
|
|
"""
|
|
if pd.isna(level):
|
|
return 0
|
|
# This mapping is an interpretation of the O*NET frequency scale.
|
|
return {
|
|
1: 0, # Yearly or less
|
|
2: 2, # Several times a year
|
|
3: 10, # Several times a month
|
|
4: 30, # Several times a week
|
|
5: 120, # Daily
|
|
6: 240, # Several times a day
|
|
7: 480, # Hourly or more
|
|
}.get(int(level), 0)
|
|
|
|
|
|
def _mad_z_score(series: pd.Series) -> pd.Series:
|
|
"""
|
|
Calculates the robust Z-score using Median Absolute Deviation (MAD).
|
|
This function is derived from 'cell7' of the original analysis.
|
|
"""
|
|
if series.isnull().all():
|
|
return pd.Series([np.nan] * len(series), index=series.index)
|
|
|
|
median = series.median()
|
|
# scale='normal' makes MAD comparable to the standard deviation for a normal distribution.
|
|
mad = median_abs_deviation(series.dropna(), scale='normal')
|
|
if mad == 0:
|
|
return pd.Series([np.nan] * len(series), index=series.index)
|
|
return (series - median) / mad
|
|
|
|
|
|
def run_preprocessing() -> pd.DataFrame:
|
|
"""
|
|
Main orchestrator for the preprocessing pipeline.
|
|
|
|
This function faithfully reproduces the data transformation pipeline from the
|
|
original `analysis.py` script, including the `preprocessing_time_estimates`
|
|
and cell-specific data manipulations.
|
|
|
|
Returns:
|
|
pd.DataFrame: A fully preprocessed DataFrame ready for the generators.
|
|
"""
|
|
logging.info("Starting data preprocessing...")
|
|
conn = None
|
|
try:
|
|
conn = get_db_connection()
|
|
if conn is None:
|
|
raise ConnectionError("Could not establish database connection.")
|
|
|
|
# --- 1. Load Data from Database ---
|
|
# Fetch all necessary tables to build the initial DataFrame.
|
|
logging.info("Loading data from O*NET database...")
|
|
task_ratings_df = pd.read_sql_query("SELECT * FROM task_ratings", conn)
|
|
task_statements_df = pd.read_sql_query("SELECT * FROM task_statements", conn)
|
|
occupations_df = pd.read_sql_query("SELECT * FROM occupation_data", conn)
|
|
|
|
# --- 2. Initial Merge ---
|
|
# Merge the tables to create a comprehensive base DataFrame.
|
|
# Merging on both 'onetsoc_code' and 'task_id' is crucial to avoid
|
|
# creating duplicate columns from the overlapping 'onetsoc_code'.
|
|
logging.info("Merging base tables...")
|
|
tasks_df = pd.merge(task_ratings_df, task_statements_df, on=['onetsoc_code', 'task_id'])
|
|
tasks_df = pd.merge(tasks_df, occupations_df, on='onetsoc_code')
|
|
|
|
# --- 3. Create "Atomic Tasks" and Time Estimates (from `preprocessing_time_estimates`) ---
|
|
# This is the core of the analysis, focusing on tasks with frequency ratings.
|
|
logging.info("Filtering for 'atomic tasks' (scale_id='FR') and calculating time estimates...")
|
|
# Strip whitespace from scale_id to ensure the filter works correctly.
|
|
tasks_df['scale_id'] = tasks_df['scale_id'].str.strip()
|
|
atomic_tasks = tasks_df[tasks_df['scale_id'] == 'FR'].copy()
|
|
|
|
# Convert frequency confidence intervals into minutes/day
|
|
atomic_tasks['lb_estimate_in_minutes'] = atomic_tasks['lower_ci_bound'].apply(_convert_to_minutes)
|
|
atomic_tasks['ub_estimate_in_minutes'] = atomic_tasks['upper_ci_bound'].apply(_convert_to_minutes)
|
|
atomic_tasks['estimate_midpoint'] = (atomic_tasks['lb_estimate_in_minutes'] + atomic_tasks['ub_estimate_in_minutes']) / 2
|
|
|
|
# --- 4. Add Derived Columns for Analysis (from `cell` logic) ---
|
|
logging.info("Adding derived columns for analysis...")
|
|
|
|
# Add `onetsoc_major` for grouping by occupation category
|
|
atomic_tasks['onetsoc_major'] = atomic_tasks['onetsoc_code'].str[:2]
|
|
|
|
# Calculate estimate_range and estimate_ratio used in several plots
|
|
atomic_tasks['estimate_range'] = atomic_tasks['ub_estimate_in_minutes'] - atomic_tasks['lb_estimate_in_minutes']
|
|
|
|
# To calculate ratio, ensure lower bound is positive to avoid division by zero
|
|
lb_positive = atomic_tasks['lb_estimate_in_minutes'] > 0
|
|
atomic_tasks['estimate_ratio'] = np.nan
|
|
atomic_tasks.loc[lb_positive, 'estimate_ratio'] = atomic_tasks['ub_estimate_in_minutes'] / atomic_tasks['lb_estimate_in_minutes']
|
|
|
|
# --- 5. Calculate Outlier Scores (from `cell6` and `cell7`) ---
|
|
logging.info("Calculating standard and robust Z-scores for outlier detection...")
|
|
|
|
# Standard Z-score
|
|
grouped_stats = atomic_tasks.groupby('onetsoc_code')['estimate_midpoint'].agg(['mean', 'std'])
|
|
atomic_tasks = atomic_tasks.merge(grouped_stats, on='onetsoc_code', how='left')
|
|
|
|
# Calculate Z-score, avoiding division by zero if std is 0
|
|
non_zero_std = atomic_tasks['std'].notna() & (atomic_tasks['std'] != 0)
|
|
atomic_tasks['z_score'] = np.nan
|
|
atomic_tasks.loc[non_zero_std, 'z_score'] = \
|
|
(atomic_tasks.loc[non_zero_std, 'estimate_midpoint'] - atomic_tasks.loc[non_zero_std, 'mean']) / atomic_tasks.loc[non_zero_std, 'std']
|
|
|
|
# Robust Z-score (using MAD)
|
|
atomic_tasks['robust_z_score'] = atomic_tasks.groupby('onetsoc_code')['estimate_midpoint'].transform(_mad_z_score)
|
|
|
|
# --- 6. Prepare for other generators ---
|
|
# NOTE: The data for the 'task_breakdown_by_occupation' generator, specifically
|
|
# the 'remote_status' and 'estimateable' columns, is not available in the O*NET
|
|
# database. This data was likely loaded from a separate file (e.g., 'tasks_clean.parquet')
|
|
# in the original notebook. For now, we will add placeholder columns.
|
|
atomic_tasks['remote_status'] = 'unknown'
|
|
atomic_tasks['estimateable'] = 'unknown'
|
|
|
|
|
|
logging.info("Data preprocessing complete.")
|
|
return atomic_tasks
|
|
|
|
except Exception as e:
|
|
logging.error("An error occurred during preprocessing: %s", e, exc_info=True)
|
|
# Return an empty DataFrame on failure to prevent downstream errors
|
|
return pd.DataFrame()
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
logging.info("Database connection closed.")
|
|
|
|
if __name__ == '__main__':
|
|
# This allows the preprocessing to be run directly for testing or debugging.
|
|
# Note: Requires data to be set up first by running data.py.
|
|
try:
|
|
processed_data = run_preprocessing()
|
|
if not processed_data.empty:
|
|
print("Preprocessing successful. DataFrame shape:", processed_data.shape)
|
|
print("Columns:", processed_data.columns.tolist())
|
|
print(processed_data.head())
|
|
# Save to a temporary file to inspect the output
|
|
output_path = "temp_preprocessed_data.csv"
|
|
processed_data.to_csv(output_path, index=False)
|
|
print(f"Sample output saved to {output_path}")
|
|
else:
|
|
print("Preprocessing failed or resulted in an empty DataFrame.")
|
|
|
|
except (FileNotFoundError, ConnectionError) as e:
|
|
logging.error("Failed to run preprocessing: %s", e)
|