from .run import Run from .logger import logger import pandas as pd import numpy as np def check_for_insanity(run: Run) -> Run: raise NotImplementedError def create_df_tasks(run: Run) -> Run: """ Creates a dataframe of tasks from the O*NET database, and merges it with remote status data. This replicates the logic from old/enrich_task_ratings.py and parts of old/analysis.py The resulting dataframe, `run.df_tasks` will be used by the enrichment steps. """ logger.info("Creating tasks dataframe") cache_path = run.cache_dir / f"onet_{run.onet_version}_tasks_with_remote_status.parquet" if cache_path.exists(): logger.info(f"Loading cached tasks dataframe from {cache_path}") run.df_tasks = pd.read_parquet(cache_path) return run query = """ SELECT tr.onetsoc_code, tr.task_id, ts.task, od.title AS occupation_title, od.description AS occupation_description, tr.scale_id, tr.category, tr.data_value, dr.dwa_title FROM task_ratings tr JOIN task_statements ts ON tr.task_id = ts.task_id JOIN occupation_data od ON tr.onetsoc_code = od.onetsoc_code LEFT JOIN tasks_to_dwas td ON tr.onetsoc_code = td.onetsoc_code AND tr.task_id = td.task_id LEFT JOIN dwa_reference dr ON td.dwa_id = dr.dwa_id; """ df = pd.read_sql_query(query, run.onet_conn) logger.info(f"Fetched {len(df)} records (including DWA info) from the database.") # Separate ratings from DWAs core_cols = [ "onetsoc_code", "task_id", "task", "occupation_title", "occupation_description", "scale_id", "category", "data_value" ] ratings_df = df[core_cols].drop_duplicates().reset_index(drop=True) dwa_cols = ["onetsoc_code", "task_id", "dwa_title"] dwas_df = df[dwa_cols].dropna(subset=["dwa_title"]).drop_duplicates().reset_index(drop=True) # 1. Handle Frequency (FT) logger.info("Processing Frequency data") freq_df = ratings_df[ratings_df["scale_id"] == "FT"].copy() if not freq_df.empty: freq_pivot = freq_df.pivot_table( index=["onetsoc_code", "task_id"], columns="category", values="data_value", fill_value=0, ) freq_pivot.columns = [f"frequency_category_{int(col)}" for col in freq_pivot.columns] else: idx = pd.MultiIndex(levels=[[], []], codes=[[], []], names=["onetsoc_code", "task_id"]) freq_pivot = pd.DataFrame(index=idx) # 2. Handle Importance (IM, IJ) logger.info("Processing Importance data") imp_df = ratings_df[ratings_df["scale_id"].isin(["IM", "IJ"])].copy() if not imp_df.empty: imp_avg = imp_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index() imp_avg.rename(columns={"data_value": "importance_average"}, inplace=True) else: imp_avg = pd.DataFrame(columns=["onetsoc_code", "task_id", "importance_average"]) # 3. Handle Relevance (RT) logger.info("Processing Relevance data") rel_df = ratings_df[ratings_df["scale_id"] == "RT"].copy() if not rel_df.empty: rel_avg = rel_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index() rel_avg.rename(columns={"data_value": "relevance_average"}, inplace=True) else: rel_avg = pd.DataFrame(columns=["onetsoc_code", "task_id", "relevance_average"]) # 4. Process DWAs logger.info("Processing DWA data") if not dwas_df.empty: dwas_grouped = dwas_df.groupby(["onetsoc_code", "task_id"])["dwa_title"].apply(list).reset_index() dwas_grouped.rename(columns={"dwa_title": "dwas"}, inplace=True) else: dwas_grouped = None # 5. Get Base Task/Occupation Info logger.info("Extracting base task/occupation info") base_cols = ["onetsoc_code", "task_id", "task", "occupation_title", "occupation_description"] base_info = ratings_df[base_cols].drop_duplicates().set_index(["onetsoc_code", "task_id"]) # 6. Merge Processed ONET Data logger.info("Merging processed ONET data") final_df = base_info.merge(freq_pivot, left_index=True, right_index=True, how="left") final_df = final_df.reset_index() if not imp_avg.empty: final_df = final_df.merge(imp_avg, on=["onetsoc_code", "task_id"], how="left") else: final_df["importance_average"] = np.nan if not rel_avg.empty: final_df = final_df.merge(rel_avg, on=["onetsoc_code", "task_id"], how="left") else: final_df["relevance_average"] = np.nan if dwas_grouped is not None and not dwas_grouped.empty: final_df = final_df.merge(dwas_grouped, on=["onetsoc_code", "task_id"], how="left") if "dwas" in final_df.columns: final_df["dwas"] = final_df["dwas"].apply(lambda x: x if isinstance(x, list) else []) else: final_df["dwas"] = [[] for _ in range(len(final_df))] final_df = final_df.replace({np.nan: None}) # 7. Merge with EPOCH remote data logger.info("Merging with EPOCH remote data") final_df = pd.merge(final_df, run.epoch_df[['Task', 'Remote']], left_on='task', right_on='Task', how='left') final_df = final_df.drop('Task', axis=1).rename(columns={'Remote': 'remote_status'}) logger.info(f"Created tasks dataframe with shape {final_df.shape}") final_df.to_parquet(cache_path) run.df_tasks = final_df return run