This commit is contained in:
Félix Dorn 2025-07-03 16:26:30 +02:00
commit 2da206d368
17 changed files with 955 additions and 0 deletions

35
pipeline/constants.py Normal file
View file

@ -0,0 +1,35 @@
OCCUPATION_MAJOR_CODES = {
'11': 'Management',
'13': 'Business & Financial',
'15': 'Computer & Mathematical',
'17': 'Architecture & Engineering',
'19': 'Life, Physical, & Social Science',
'21': 'Community & Social Service',
'23': 'Legal',
'25': 'Education, Training, & Library',
'27': 'Arts, Design, & Media',
'29': 'Healthcare Practitioners',
'31': 'Healthcare Support',
'33': 'Protective Service',
'35': 'Food Preparation & Serving',
'37': 'Building & Grounds Maintenance',
'39': 'Personal Care & Service',
'41': 'Sales & Related',
'43': 'Office & Admin Support',
'45': 'Farming, Fishing, & Forestry',
'47': 'Construction & Extraction',
'49': 'Installation, Maintenance, & Repair',
'51': 'Production',
'53': 'Transportation & Material Moving',
'55': 'Military Specific',
}
GRAY = {'50':'#f8fafc','100':'#f1f5f9','200':'#e2e8f0',
'300':'#cbd5e1','400':'#94a3b8','500':'#64748b',
'600':'#475569','700':'#334155','800':'#1e293b',
'900':'#0f172a','950':'#020617'}
LIME = {'50': '#f7fee7','100': '#ecfcca','200': '#d8f999',
'300': '#bbf451','400': '#9ae600','500': '#83cd00',
'600': '#64a400','700': '#497d00','800': '#3c6300',
'900': '#35530e','950': '#192e03'}

13
pipeline/enrichments.py Normal file
View file

@ -0,0 +1,13 @@
"""
This module enriches data, they take time to run, and are usually expensive (API calls...),
they should manage their own state, and only be run if the data's version is different than
their save.
"""
from .run import Run
import pandas as pd
def enrich_with_task_estimateability(run: Run) -> pd.DataFrame:
raise NotImplementedError
def enrich_with_task_estimates(run: Run) -> pd.DataFrame:
raise NotImplementedError

17
pipeline/fetchers.py Normal file
View file

@ -0,0 +1,17 @@
"""
Fetchers retrieve remote data and return it in a format suitable for further processing, they also return its version, which should be considered opaque, though it is usually a checksum.
"""
import sqlite3
from typing import Tuple
import pandas as pd
from .metadata import Metadata
def fetch_onet_database(meta: Metadata) -> Tuple[sqlite3.Connection, str]:
raise NotImplementedError
def fetch_oesm_data(meta: Metadata) -> Tuple[pd.DataFrame, str]:
raise NotImplementedError
def fetch_epoch_remote_data(meta: Metadata) -> Tuple[pd.DataFrame, str]:
raise NotImplementedError

View file

@ -0,0 +1,5 @@
from .estimate_histplot import generate_estimate_histplot
GENERATORS = [
generate_estimate_histplot
]

View file

@ -0,0 +1,6 @@
from ..run import Run
from pathlib import Path
from typing import Generator
def generate_estimate_histplot(run: Run, output_dir: Path) -> Generator[Path]:
raise NotImplementedError

View file

@ -0,0 +1,6 @@
import pandas as pd
from typings import List
def must_have_columns(df: pd.DataFrame, columns: List[str]):
if not all(col in df.columns for col in columns):
raise ValueError(f"DataFrame is missing required columns: {columns}")

28
pipeline/metadata.py Normal file
View file

@ -0,0 +1,28 @@
"""
This module defines the Metadata model for the pipeline.
"""
from datetime import datetime
from pydantic import BaseModel, Field
from typing import Dict, Any
class Metadata(BaseModel):
"""
A Pydantic model for storing pipeline metadata.
This class is intended to be instantiated once and passed through the
pipeline. Each step in the pipeline can then add its own metadata.
This provides a centralized and structured way to track data provenance,
versions, and other important information.
"""
fetchers: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
ts: str = Field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
commit: str = Field(default_factory=lambda: _get_current_commit())
def _get_current_commit() -> str:
"""
Returns the current git commit hash, "unknown", or "errored" depending on why the commit could not be retrieved.
"""
raise NotImplementedError

View file

@ -0,0 +1,10 @@
from .run import Run
def check_for_insanity(run: Run) -> Run:
raise NotImplementedError
def create_df_tasks(run: Run) -> Run:
"""
df_tasks are tasks that are remote-able, estimateable
Add lb_estimate_in_minutes, ub_estimate_in_minutes, estimate_range, estimate_ratio, estimate_midpoint as columns
"""
raise NotImplementedError

22
pipeline/run.py Normal file
View file

@ -0,0 +1,22 @@
from pydantic import BaseModel, Field
import sqlite3
import pandas as pd
from typing import Optional
from .metadata import Metadata
class Run(BaseModel):
# === FETCHERS ===
onet_conn: Optional[sqlite3.Connection] = None
onet_version: Optional[str] = None
oesm_df: Optional[pd.DataFrame] = None
oesm_version: Optional[str] = None
epoch_df: Optional[pd.DataFrame] = None
epoch_version: Optional[str] = None
# === ENRICHMENTS ===
task_estimateability_df: Optional[pd.DataFrame] = None
task_estimates_df: Optional[pd.DataFrame] = None
meta: Metadata = Field(default_factory=Metadata)

53
pipeline/runner.py Normal file
View file

@ -0,0 +1,53 @@
from dotenv import load_dotenv
from .fetchers import fetch_oesm_data, fetch_epoch_remote_data, fetch_onet_database
from .enrichments import enrich_with_task_estimateability, enrich_with_task_estimates
from .postprocessors import check_for_insanity, create_df_tasks
from .generators import GENERATORS
from .run import Run
from .constants import GRAY
import seaborn as sns
import matplotlib as mpl
from pathlib import Path
from typings import Optional
def run(output_dir: Optional[str] = None):
if output_dir is None:
output_dir = Path(".")
load_dotenv()
_setup_graph_rendering()
current_run = Run()
# Fetchers (fetchers.py)
current_run.onet_conn, current_run.onet_version = fetch_onet_database(current_run.meta)
current_run.oesm_df, current_run.oesm_version = fetch_oesm_data(current_run.meta)
current_run.epoch_df, current_run.epoch_version = fetch_epoch_remote_data(current_run.meta)
# Enrichments (enrichments.py)
current_run.task_estimateability_df = enrich_with_task_estimateability(current_run)
current_run.task_estimates_df = enrich_with_task_estimates(current_run)
# Postprocessors (postprocessors.py)
create_df_tasks(current_run)
check_for_insanity(current_run)
# Generators (generators/)
for gen in GENERATORS:
gen(current_run, output_dir)
def _setup_graph_rendering():
mpl.rcParams.update({
'figure.facecolor' : GRAY['50'],
'axes.facecolor' : GRAY['50'],
'axes.edgecolor' : GRAY['100'],
'axes.labelcolor' : GRAY['700'],
'xtick.color' : GRAY['700'],
'ytick.color' : GRAY['700'],
'font.family' : 'Inter',
'font.size' : 11,
})
sns.set_style("white")