sprint-econtai/pipeline/runner.py
Félix Dorn 65dc648797 wip
2025-07-15 00:34:54 +02:00

215 lines
10 KiB
Python

import sqlite3
import os
from .logger import logger
import pandas as pd
from dotenv import load_dotenv
from .fetchers import fetch_onet_database, fetch_oesm_data, fetch_epoch_remote_data, ONET_VERSION, fetch_metr_data
from .classification import classify_tasks_as_estimable, generate_time_estimates_for_tasks
from .generators import GENERATORS
from .aggregate import create_task_summary_by_occupation_df, aggregate_task_summary_by_major_code
from .utils import convert_to_minutes
import argparse
import platformdirs
import numpy as np
from pathlib import Path
class Runner:
onet_conn: sqlite3.Connection
oesm_df: pd.DataFrame
epoch_df: pd.DataFrame
metr_results: dict
def __init__(self, output_dir: Path | str, debug: bool, bust_estimability: bool, bust_estimates: bool):
if isinstance(output_dir, str):
output_dir = Path(output_dir).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
self.output_dir = output_dir
self.intermediate_dir = self.output_dir / "intermediate"
self.intermediate_dir.mkdir(parents=True, exist_ok=True)
self.cache_dir = platformdirs.user_cache_path("econtai")
self.debug = debug
self.bust_estimability = bust_estimability
self.bust_estimates = bust_estimates
if debug:
os.environ["LITELLM_LOG"] = os.environ.get("LITELLM_LOG", "INFO")
def run(self):
load_dotenv()
self.onet_conn = fetch_onet_database(self.cache_dir)
self.oesm_df = fetch_oesm_data(self.cache_dir)
self.epoch_df = fetch_epoch_remote_data(self.cache_dir)
self.metr_results = fetch_metr_data(self.cache_dir)
self.df_tasks = self._create_df_tasks()
self.df_tasks['onetsoc_major'] = self.df_tasks['onetsoc_code'].str[:2]
df_to_process = self.df_tasks[
(self.df_tasks['importance_average'] > 3) &
(self.df_tasks['remote_status'] == 'remote')
].copy()
if self.debug:
df_to_process = df_to_process.head(10)
task_estimability_df = classify_tasks_as_estimable(self.cache_dir, df_to_process, bust=self.bust_estimability)
self.df_tasks = pd.merge(self.df_tasks, task_estimability_df, on='task', how='left')
self.df_tasks['estimable'] = self.df_tasks['estimable'].fillna(False)
self.df_tasks.to_parquet(self.intermediate_dir / "df_tasks.parquet")
df_to_process = pd.merge(df_to_process, task_estimability_df, on='task', how='left')
df_to_process['estimable'] = self.df_tasks['estimable'].fillna(False)
df_to_process = df_to_process[df_to_process['estimable']].copy()
task_estimates_df = generate_time_estimates_for_tasks(self.cache_dir, df_to_process, bust=self.bust_estimates)
df = pd.merge(df_to_process, task_estimates_df, on=['onetsoc_code', 'task_id'], how='left')
df['lb_estimate_in_minutes'] = df.apply(lambda row: convert_to_minutes(row['lb_estimate_qty'], row['lb_estimate_unit']), axis=1)
df['ub_estimate_in_minutes'] = df.apply(lambda row: convert_to_minutes(row['ub_estimate_qty'], row['ub_estimate_unit']), axis=1)
df['estimate_range'] = df.ub_estimate_in_minutes - df.lb_estimate_in_minutes
df['estimate_ratio'] = np.divide(df.ub_estimate_in_minutes, df.lb_estimate_in_minutes).replace([np.inf, -np.inf], None)
df['estimate_midpoint'] = (df.lb_estimate_in_minutes + df.ub_estimate_in_minutes) / 2
df.to_parquet(self.intermediate_dir / "estimable_tasks_with_estimates.parquet")
self.task_summary_by_occupation_df = create_task_summary_by_occupation_df(self.df_tasks, self.oesm_df)
self.task_summary_by_occupation_df.to_parquet(self.intermediate_dir / "task_summary_by_occupation.parquet")
self.task_summary_by_major_occupation_df = aggregate_task_summary_by_major_code(self.task_summary_by_occupation_df)
self.task_summary_by_major_occupation_df.to_parquet(self.intermediate_dir / "task_summary_by_major_occupation.parquet")
self._check_for_insanity(df)
for gen in GENERATORS:
for asset in gen(**{
"output_dir": self.output_dir,
"runner": self,
"df": df,
"task_summary_by_occupation_df": self.task_summary_by_occupation_df,
"task_summary_by_major_occupation_df": self.task_summary_by_major_occupation_df,
"df_tasks": self.df_tasks,
"oesm_df": self.oesm_df,
"metr_results": self.metr_results,
}):
logger.info(f"New asset: {asset}")
def _create_df_tasks(self) -> pd.DataFrame:
DATA_PATH = self.cache_dir / f"onet_{ONET_VERSION}_tasks_with_remote_status.parquet"
if DATA_PATH.exists():
logger.info(f"Loading cached tasks dataframe from {DATA_PATH}")
return pd.read_parquet(DATA_PATH)
logger.info("Creating tasks dataframe")
query = """
SELECT
tr.onetsoc_code,
tr.task_id,
ts.task,
od.title AS occupation_title,
od.description AS occupation_description,
tr.scale_id,
tr.category,
tr.data_value
FROM
task_ratings tr
JOIN
task_statements ts ON tr.task_id = ts.task_id
JOIN
occupation_data od ON tr.onetsoc_code = od.onetsoc_code;
"""
ratings_df = pd.read_sql_query(query, self.onet_conn)
logger.info(f"Fetched {len(ratings_df)} task rating records from the database.")
# 1. Handle Frequency (FT)
logger.info("Processing Frequency data")
freq_df = ratings_df[ratings_df["scale_id"] == "FT"].copy()
if not freq_df.empty:
freq_pivot = freq_df.pivot_table(
index=["onetsoc_code", "task_id"],
columns="category",
values="data_value",
fill_value=0,
)
freq_pivot.columns = [f"frequency_category_{int(col)}" for col in freq_pivot.columns]
else:
raise ValueError("No frequency data.")
# 2. Handle Importance (IM, IJ)
logger.info("Processing Importance data")
imp_df = ratings_df[ratings_df["scale_id"].isin(["IM", "IJ"])].copy()
if not imp_df.empty:
imp_avg = imp_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index()
imp_avg.rename(columns={"data_value": "importance_average"}, inplace=True)
else:
raise ValueError("No importance data.")
# 3. Handle Relevance (RT)
logger.info("Processing Relevance data")
rel_df = ratings_df[ratings_df["scale_id"] == "RT"].copy()
if not rel_df.empty:
rel_avg = rel_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index()
rel_avg.rename(columns={"data_value": "relevance_average"}, inplace=True)
else:
raise ValueError("No relevance data.")
# 5. Get Base Task/Occupation Info
logger.info("Extracting base task/occupation info")
base_cols = ["onetsoc_code", "task_id", "task", "occupation_title", "occupation_description"]
base_info = ratings_df[base_cols].drop_duplicates().set_index(["onetsoc_code", "task_id"])
# 6. Merge Processed ONET Data
logger.info("Merging processed ONET data")
final_df = base_info.merge(freq_pivot, left_index=True, right_index=True, how="left")
final_df = final_df.reset_index()
if not imp_avg.empty:
final_df = final_df.merge(imp_avg, on=["onetsoc_code", "task_id"], how="left")
else:
final_df["importance_average"] = np.nan
if not rel_avg.empty:
final_df = final_df.merge(rel_avg, on=["onetsoc_code", "task_id"], how="left")
else:
final_df["relevance_average"] = np.nan
final_df = final_df.replace({np.nan: None})
# 7. Merge with EPOCH remote data
logger.info("Merging with EPOCH remote data")
final_df = pd.merge(final_df, self.epoch_df[['Task', 'Remote']], left_on='task', right_on='Task', how='left')
final_df = final_df.drop('Task', axis=1).rename(columns={'Remote': 'remote_status'})
logger.info(f"Created tasks dataframe with shape {final_df.shape}")
final_df.to_parquet(DATA_PATH)
return final_df
def _check_for_insanity(self, df: pd.DataFrame):
if df['lb_estimate_in_minutes'].isnull().any():
missing_count = df['lb_estimate_in_minutes'].isnull().sum()
raise ValueError(f"Found {missing_count} atomic tasks with missing 'lb_estimate_in_minutes'.")
if df['ub_estimate_in_minutes'].isnull().any():
missing_count = df['ub_estimate_in_minutes'].isnull().sum()
raise ValueError(f"Found {missing_count} atomic tasks with missing 'ub_estimate_in_minutes'.")
valid_estimates = df.dropna(subset=['lb_estimate_in_minutes', 'ub_estimate_in_minutes'])
impossible_bounds = valid_estimates[
(valid_estimates['lb_estimate_in_minutes'] <= 0) |
(valid_estimates['ub_estimate_in_minutes'] <= 0) |
(valid_estimates['lb_estimate_in_minutes'] > valid_estimates['ub_estimate_in_minutes'])
]
if not impossible_bounds.empty:
raise ValueError(f"Found {len(impossible_bounds)} rows with impossible bounds (e.g., lb > ub or value <= 0).")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run the econtai pipeline.")
parser.add_argument("--output-dir", type=str, default="dist/", help="The directory to write output files to.")
parser.add_argument("--bust-estimability", action="store_true", help="Bust the saved task estimability classification (EXPENSIVE)")
parser.add_argument("--bust-estimates", action="store_true", help="Bust the tasks estimates (EXPENSIVE)")
parser.add_argument("--debug", action="store_true", help="Enable debug mode (e.g., process fewer tasks).")
args = parser.parse_args()
Runner(output_dir=args.output_dir, debug=args.debug, bust_estimability=args.bust_estimability, bust_estimates=args.bust_estimates).run()