diff --git a/pipeline/enrichments.py b/pipeline/enrichments.py index bb925df..ab12c1e 100644 --- a/pipeline/enrichments.py +++ b/pipeline/enrichments.py @@ -3,26 +3,95 @@ This module enriches data, they take time to run, and are usually expensive (API they should manage their own state, and only be run if the data's version is different than their save. """ -from .run import Run +from .run import Run import pandas as pd +from typing import Any, List, Dict +import litellm + +def enrich( + model: str, + rpm: int, + messages_to_process: List[List[Dict[str, str]]], + schema: Dict[str, Any], + chunk_size: int = 100, +): + # Use litellm.batch_completion + pass def enrich_with_task_estimateability(run: Run) -> pd.DataFrame: - """ - TODO: check run.cache_dir / computed_task_estimateability.parquet, if it exists, load it, return it, and don't compute this + output_path = run.cache_dir / "computed_task_estimateability.parquet" + if output_path.exists(): + print(f"Loading cached task estimateability from {output_path}") + return pd.read_parquet(output_path) - call enrich with the right parameters, save the output to cache dir, - return it - """ - raise NotImplementedError + df_remote_tasks = run.df_tasks[run.df_tasks['remote_status'] == 'remote'].copy() + + # In the old script, we only passed unique tasks to the API + df_unique_tasks = df_remote_tasks.drop_duplicates(subset=['task']) + + + results = enrich( + model="gpt-4.1-mini", + rpm=5000, + messages_to_process=[ + [ + {"role": "system", "content": """ + Judge whether the provided O*NET task is suitable for a time estimate. If it is a single, clearly-bounded activity, typically lasting minutes, hours, or a few days, then clearly yes. If it is a continuous responsibility or behavioural norm with no schedulable duration (e.g., “follow confidentiality rules,” “serve as department head”), then clearly no. + """}, + {"role": "user", "content": f"Task: {row.task}"}, + ] + for row in df_unique_tasks.itertuples() + ], + schema={ + "type": "object", + "properties": {"estimateable": {"type": "bool"}}, + "required": ["estimateable"] + }, + chunk_size=300, + ) + + # Create a new dataframe with just enough information to identify the task uniquely + estimateability classification, save it, return it. Careful: the "task" column in itself is not unique. + return pd.DataFrame() def enrich_with_task_estimates(run: Run) -> pd.DataFrame: - """ - TODO: check run.cache_dir / computed_task_estimates.parquet, if it exists, load it, return it, and don't compute this + output_path = run.cache_dir / "computed_task_estimates.parquet" + if output_path.exists(): + print(f"Loading cached task estimates from {output_path}") + return pd.read_parquet(output_path) - call enrich with the right parameters, save the output to cache dir, - return it - """ - raise NotImplementedError + df = ... # todo -def enrich(model: str, system_prompt: str, schema: Any, rpm: int, chunk_size: int = 100, messages: Any): + results = enrich( + model="gpt-4.1-mini", + rpm=5000, + messages_to_process=[ + [ + {"role": "system", "content": "Estimate the time required to complete the following O*NET task. Your estimate should be a plausible range for how long it might take a typical, qualified worker to perform this task once. Provide your answer as a time range (lower and upper bounds). Do not provide explanations or apologies. If the task is not suitable for a time estimate (e.g., it is an ongoing responsibility), interpret it as a single, schedulable action."}, + {"role": "user", "content": f""" + Task: {row.task} + For Occupation: {row.occupation_title} + Occupation Description: {row.occupation_description}"""} + ] + for row in df.itertuples() + ], + schema={ + "type": "object", + "properties": { + "lower_bound_estimate": { + "type": "object", + "properties": {"quantity": {"type": "number"}, "unit": {"type": "string", "enum": ["minutes", "hours", "days"]}}, + "required": ["quantity", "unit"], + }, + "upper_bound_estimate": { + "type": "object", + "properties": {"quantity": {"type": "number"}, "unit": {"type": "string", "enum": ["minutes", "hours", "days"]}}, + "required": ["quantity", "unit"], + }, + }, + "required": ["lower_bound_estimate", "upper_bound_estimate"], + }, + chunk_size=200, + ) + + # Create a new dataframe with just enough information to identify the task uniquely + the estimates classification, save it, return it. Careful: the "task" column in itself is not unique. raise NotImplementedError diff --git a/pipeline/postprocessors.py b/pipeline/postprocessors.py index 5b9f82a..a390ded 100644 --- a/pipeline/postprocessors.py +++ b/pipeline/postprocessors.py @@ -1,10 +1,140 @@ from .run import Run +from .logger import logger +import pandas as pd +import numpy as np + + def check_for_insanity(run: Run) -> Run: raise NotImplementedError + def create_df_tasks(run: Run) -> Run: """ - df_tasks are tasks that are remote-able, estimateable - Add lb_estimate_in_minutes, ub_estimate_in_minutes, estimate_range, estimate_ratio, estimate_midpoint as columns + Creates a dataframe of tasks from the O*NET database, and merges it with remote status data. + This replicates the logic from old/enrich_task_ratings.py and parts of old/analysis.py + + The resulting dataframe, `run.df_tasks` will be used by the enrichment steps. """ - raise NotImplementedError + logger.info("Creating tasks dataframe") + cache_path = run.cache_dir / f"onet_{run.onet_version}_tasks_with_remote_status.parquet" + if cache_path.exists(): + logger.info(f"Loading cached tasks dataframe from {cache_path}") + run.df_tasks = pd.read_parquet(cache_path) + return run + + query = """ + SELECT + tr.onetsoc_code, + tr.task_id, + ts.task, + od.title AS occupation_title, + od.description AS occupation_description, + tr.scale_id, + tr.category, + tr.data_value, + dr.dwa_title + FROM + task_ratings tr + JOIN + task_statements ts ON tr.task_id = ts.task_id + JOIN + occupation_data od ON tr.onetsoc_code = od.onetsoc_code + LEFT JOIN + tasks_to_dwas td ON tr.onetsoc_code = td.onetsoc_code AND tr.task_id = td.task_id + LEFT JOIN + dwa_reference dr ON td.dwa_id = dr.dwa_id; + """ + df = pd.read_sql_query(query, run.onet_conn) + logger.info(f"Fetched {len(df)} records (including DWA info) from the database.") + + # Separate ratings from DWAs + core_cols = [ + "onetsoc_code", "task_id", "task", "occupation_title", + "occupation_description", "scale_id", "category", "data_value" + ] + ratings_df = df[core_cols].drop_duplicates().reset_index(drop=True) + + dwa_cols = ["onetsoc_code", "task_id", "dwa_title"] + dwas_df = df[dwa_cols].dropna(subset=["dwa_title"]).drop_duplicates().reset_index(drop=True) + + # 1. Handle Frequency (FT) + logger.info("Processing Frequency data") + freq_df = ratings_df[ratings_df["scale_id"] == "FT"].copy() + if not freq_df.empty: + freq_pivot = freq_df.pivot_table( + index=["onetsoc_code", "task_id"], + columns="category", + values="data_value", + fill_value=0, + ) + freq_pivot.columns = [f"frequency_category_{int(col)}" for col in freq_pivot.columns] + else: + idx = pd.MultiIndex(levels=[[], []], codes=[[], []], names=["onetsoc_code", "task_id"]) + freq_pivot = pd.DataFrame(index=idx) + + # 2. Handle Importance (IM, IJ) + logger.info("Processing Importance data") + imp_df = ratings_df[ratings_df["scale_id"].isin(["IM", "IJ"])].copy() + if not imp_df.empty: + imp_avg = imp_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index() + imp_avg.rename(columns={"data_value": "importance_average"}, inplace=True) + else: + imp_avg = pd.DataFrame(columns=["onetsoc_code", "task_id", "importance_average"]) + + # 3. Handle Relevance (RT) + logger.info("Processing Relevance data") + rel_df = ratings_df[ratings_df["scale_id"] == "RT"].copy() + if not rel_df.empty: + rel_avg = rel_df.groupby(["onetsoc_code", "task_id"])["data_value"].mean().reset_index() + rel_avg.rename(columns={"data_value": "relevance_average"}, inplace=True) + else: + rel_avg = pd.DataFrame(columns=["onetsoc_code", "task_id", "relevance_average"]) + + # 4. Process DWAs + logger.info("Processing DWA data") + if not dwas_df.empty: + dwas_grouped = dwas_df.groupby(["onetsoc_code", "task_id"])["dwa_title"].apply(list).reset_index() + dwas_grouped.rename(columns={"dwa_title": "dwas"}, inplace=True) + else: + dwas_grouped = None + + # 5. Get Base Task/Occupation Info + logger.info("Extracting base task/occupation info") + base_cols = ["onetsoc_code", "task_id", "task", "occupation_title", "occupation_description"] + base_info = ratings_df[base_cols].drop_duplicates().set_index(["onetsoc_code", "task_id"]) + + # 6. Merge Processed ONET Data + logger.info("Merging processed ONET data") + final_df = base_info.merge(freq_pivot, left_index=True, right_index=True, how="left") + final_df = final_df.reset_index() + + if not imp_avg.empty: + final_df = final_df.merge(imp_avg, on=["onetsoc_code", "task_id"], how="left") + else: + final_df["importance_average"] = np.nan + + if not rel_avg.empty: + final_df = final_df.merge(rel_avg, on=["onetsoc_code", "task_id"], how="left") + else: + final_df["relevance_average"] = np.nan + + if dwas_grouped is not None and not dwas_grouped.empty: + final_df = final_df.merge(dwas_grouped, on=["onetsoc_code", "task_id"], how="left") + if "dwas" in final_df.columns: + final_df["dwas"] = final_df["dwas"].apply(lambda x: x if isinstance(x, list) else []) + else: + final_df["dwas"] = [[] for _ in range(len(final_df))] + + final_df = final_df.replace({np.nan: None}) + + # 7. Merge with EPOCH remote data + logger.info("Merging with EPOCH remote data") + final_df = pd.merge(final_df, run.epoch_df[['Task', 'Remote']], left_on='task', right_on='Task', how='left') + final_df = final_df.drop('Task', axis=1).rename(columns={'Remote': 'remote_status'}) + + + logger.info(f"Created tasks dataframe with shape {final_df.shape}") + final_df.to_parquet(cache_path) + + run.df_tasks = final_df + return run diff --git a/pipeline/runner.py b/pipeline/runner.py index f638f9e..e1b4464 100644 --- a/pipeline/runner.py +++ b/pipeline/runner.py @@ -14,19 +14,18 @@ from typing import Optional CACHE_DIR = platformdirs.user_cache_dir("econtai") -def run(output_dir: Optional[str] = None): +def run(output_dir: Path | Optional[str] = None): load_dotenv() _setup_graph_rendering() if output_dir is None: output_dir = Path("dist/") - else: + elif isinstance(output_dir, str): output_dir = Path(output_dir).resolve() output_dir.mkdir(parents=True, exist_ok=True) - - current_run = Run(output_dir=output_dir, cache_dir=CACHE_DIR) + current_run = Run(output_dir=output_dir, cache_dir=Path(CACHE_DIR).resolve()) current_run.cache_dir.mkdir(parents=True, exist_ok=True) # Fetchers (fetchers.py) @@ -34,12 +33,13 @@ def run(output_dir: Optional[str] = None): current_run.oesm_df, current_run.oesm_version = fetch_oesm_data(current_run) current_run.epoch_df, current_run.epoch_version = fetch_epoch_remote_data(current_run) + current_run = create_df_tasks(current_run) + # Enrichments (enrichments.py) current_run.task_estimateability_df = enrich_with_task_estimateability(current_run) current_run.task_estimates_df = enrich_with_task_estimates(current_run) # Postprocessors (postprocessors.py) - create_df_tasks(current_run) check_for_insanity(current_run) # Generators (generators/)