import sqlite3 import os from .logger import logger import pandas as pd from dotenv import load_dotenv from .fetchers import fetch_onet_database, fetch_oesm_data, fetch_epoch_remote_data, ONET_VERSION, fetch_metr_data from .classification import classify_tasks_as_estimable, generate_time_estimates_for_tasks from .generators import GENERATORS from .aggregate import create_task_summary_by_occupation_df, aggregate_task_summary_by_major_code from .utils import convert_to_minutes import argparse import platformdirs import numpy as np from pathlib import Path class Runner: onet_conn: sqlite3.Connection oesm_df: pd.DataFrame epoch_df: pd.DataFrame metr_results: dict def __init__(self, output_dir: Path | str, debug: bool, bust_estimability: bool, bust_estimates: bool): if isinstance(output_dir, str): output_dir = Path(output_dir).resolve() output_dir.mkdir(parents=True, exist_ok=True) self.output_dir = output_dir self.intermediate_dir = self.output_dir / "intermediate" self.intermediate_dir.mkdir(parents=True, exist_ok=True) self.cache_dir = platformdirs.user_cache_path("econtai") self.debug = debug self.bust_estimability = bust_estimability self.bust_estimates = bust_estimates if debug: os.environ["LITELLM_LOG"] = os.environ.get("LITELLM_LOG", "INFO") def run(self): load_dotenv() self.onet_conn = fetch_onet_database(self.cache_dir) self.oesm_df = fetch_oesm_data(self.cache_dir) self.epoch_df = fetch_epoch_remote_data(self.cache_dir) self.metr_results = fetch_metr_data(self.cache_dir) self.df_tasks = self._create_df_tasks() self.df_tasks['onetsoc_major'] = self.df_tasks['onetsoc_code'].str[:2] df_to_process = self.df_tasks[ (self.df_tasks['importance_average'] > 3) & (self.df_tasks['remote_status'] == 'remote') ].copy() if self.debug: df_to_process = df_to_process.head(10) task_estimability_df = classify_tasks_as_estimable(self.cache_dir, df_to_process, bust=self.bust_estimability) self.df_tasks = pd.merge(self.df_tasks, task_estimability_df, on='task', how='left') self.df_tasks['estimable'] = self.df_tasks['estimable'].fillna(False) self.df_tasks.to_parquet(self.intermediate_dir / "df_tasks.parquet") df_to_process = pd.merge(df_to_process, task_estimability_df, on='task', how='left') df_to_process['estimable'] = self.df_tasks['estimable'].fillna(False) df_to_process = df_to_process[df_to_process['estimable']].copy() task_estimates_df = generate_time_estimates_for_tasks(self.cache_dir, df_to_process, bust=self.bust_estimates) df = pd.merge(df_to_process, task_estimates_df, on=['onetsoc_code', 'task_id'], how='left') df['lb_estimate_in_minutes'] = df.apply(lambda row: convert_to_minutes(row['lb_estimate_qty'], row['lb_estimate_unit']), axis=1) df['ub_estimate_in_minutes'] = df.apply(lambda row: convert_to_minutes(row['ub_estimate_qty'], row['ub_estimate_unit']), axis=1) df['estimate_range'] = df.ub_estimate_in_minutes - df.lb_estimate_in_minutes df['estimate_ratio'] = np.divide(df.ub_estimate_in_minutes, df.lb_estimate_in_minutes).replace([np.inf, -np.inf], None) df['estimate_midpoint'] = (df.lb_estimate_in_minutes + df.ub_estimate_in_minutes) / 2 df.to_parquet(self.intermediate_dir / "estimable_tasks_with_estimates.parquet") self.task_summary_by_occupation_df = create_task_summary_by_occupation_df(self.df_tasks, self.oesm_df) self.task_summary_by_occupation_df.to_parquet(self.intermediate_dir / "task_summary_by_occupation.parquet") self.task_summary_by_major_occupation_df = aggregate_task_summary_by_major_code(self.task_summary_by_occupation_df) self.task_summary_by_major_occupation_df.to_parquet(self.intermediate_dir / "task_summary_by_major_occupation.parquet") self._check_for_insanity(df) for gen in GENERATORS: for asset in gen(**{ "output_dir": self.output_dir, "runner": self, "df": df, "task_summary_by_occupation_df": self.task_summary_by_occupation_df, "task_summary_by_major_occupation_df": self.task_summary_by_major_occupation_df, "df_tasks": self.df_tasks, "oesm_df": self.oesm_df, "metr_results": self.metr_results, }): logger.info(f"New asset: {asset}") def _create_df_tasks(self) -> pd.DataFrame: DATA_PATH = self.cache_dir / f"onet_{ONET_VERSION}_tasks_with_remote_status.parquet" if DATA_PATH.exists(): logger.info(f"Loading cached tasks dataframe from {DATA_PATH}") return pd.read_parquet(DATA_PATH) logger.info("Creating tasks dataframe") query = """ SELECT tr.onetsoc_code, tr.task_id, ts.task, od.title AS occupation_title, od.description AS occupation_description, tr.scale_id, tr.category, tr.data_value FROM task_ratings tr JOIN task_statements ts ON tr.task_id = ts.task_id JOIN occupation_data od ON tr.onetsoc_code = od.onetsoc_code; """ ratings_df = pd.read_sql_query(query, self.onet_conn) logger.info(f"Fetched {len(ratings_df)} task rating records from the database.") # 1. Handle Frequency (FT) logger.info("Processing Frequency data") freq_df = ratings_df[ratings_df["scale_id"] == "FT"].copy() if not freq_df.empty: freq_pivot = freq_df.pivot_table( index=["onetsoc_code", "task_id"], columns="category", values="data_value", fill_value=0, ) freq_pivot.columns = [f"frequency_category_{int(col)}" for col in freq_pivot.columns] else: raise ValueError("No frequency data.") # 2. Handle Importance (IM, IJ) logger.info("Processing Importance data") imp_df = ratings_df[ratings_df["scale_id"].isin(["IM", "IJ"])].copy() if not imp_df.empty: imp_avg = imp_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index() imp_avg.rename(columns={"data_value": "importance_average"}, inplace=True) else: raise ValueError("No importance data.") # 3. Handle Relevance (RT) logger.info("Processing Relevance data") rel_df = ratings_df[ratings_df["scale_id"] == "RT"].copy() if not rel_df.empty: rel_avg = rel_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index() rel_avg.rename(columns={"data_value": "relevance_average"}, inplace=True) else: raise ValueError("No relevance data.") # 5. Get Base Task/Occupation Info logger.info("Extracting base task/occupation info") base_cols = ["onetsoc_code", "task_id", "task", "occupation_title", "occupation_description"] base_info = ratings_df[base_cols].drop_duplicates().set_index(["onetsoc_code", "task_id"]) # 6. Merge Processed ONET Data logger.info("Merging processed ONET data") final_df = base_info.merge(freq_pivot, left_index=True, right_index=True, how="left") final_df = final_df.reset_index() if not imp_avg.empty: final_df = final_df.merge(imp_avg, on=["onetsoc_code", "task_id"], how="left") else: final_df["importance_average"] = np.nan if not rel_avg.empty: final_df = final_df.merge(rel_avg, on=["onetsoc_code", "task_id"], how="left") else: final_df["relevance_average"] = np.nan final_df = final_df.replace({np.nan: None}) # 7. Merge with EPOCH remote data logger.info("Merging with EPOCH remote data") final_df = pd.merge(final_df, self.epoch_df[['Task', 'Remote']], left_on='task', right_on='Task', how='left') final_df = final_df.drop('Task', axis=1).rename(columns={'Remote': 'remote_status'}) logger.info(f"Created tasks dataframe with shape {final_df.shape}") final_df.to_parquet(DATA_PATH) return final_df def _check_for_insanity(self, df: pd.DataFrame): if df['lb_estimate_in_minutes'].isnull().any(): missing_count = df['lb_estimate_in_minutes'].isnull().sum() raise ValueError(f"Found {missing_count} atomic tasks with missing 'lb_estimate_in_minutes'.") if df['ub_estimate_in_minutes'].isnull().any(): missing_count = df['ub_estimate_in_minutes'].isnull().sum() raise ValueError(f"Found {missing_count} atomic tasks with missing 'ub_estimate_in_minutes'.") valid_estimates = df.dropna(subset=['lb_estimate_in_minutes', 'ub_estimate_in_minutes']) impossible_bounds = valid_estimates[ (valid_estimates['lb_estimate_in_minutes'] <= 0) | (valid_estimates['ub_estimate_in_minutes'] <= 0) | (valid_estimates['lb_estimate_in_minutes'] > valid_estimates['ub_estimate_in_minutes']) ] if not impossible_bounds.empty: raise ValueError(f"Found {len(impossible_bounds)} rows with impossible bounds (e.g., lb > ub or value <= 0).") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run the econtai pipeline.") parser.add_argument("--output-dir", type=str, default="dist/", help="The directory to write output files to.") parser.add_argument("--bust-estimability", action="store_true", help="Bust the saved task estimability classification (EXPENSIVE)") parser.add_argument("--bust-estimates", action="store_true", help="Bust the tasks estimates (EXPENSIVE)") parser.add_argument("--debug", action="store_true", help="Enable debug mode (e.g., process fewer tasks).") args = parser.parse_args() Runner(output_dir=args.output_dir, debug=args.debug, bust_estimability=args.bust_estimability, bust_estimates=args.bust_estimates).run()