sprint-econtai/pipeline/fetchers.py
2025-07-03 17:32:41 +02:00

152 lines
5.6 KiB
Python

"""
Fetchers retrieve remote data and return it in a format suitable for further processing, they also return its version, which should be considered opaque, though it is usually a checksum.
"""
import sqlite3
from typing import Tuple
import pandas as pd
import requests
import hashlib
import io
import zipfile
from .run import Run
from .logger import logger
def fetch_onet_database(run: Run) -> Tuple[sqlite3.Connection, str]:
"""
Downloads the O*NET database, creates a local SQLite file from it, and returns a connection.
The version is the sha256 of the downloaded zip file.
"""
url = "https://www.onetcenter.org/dl_files/database/db_29_1_mysql.zip"
logger.info(f"Downloading O*NET database from {url}")
response = requests.get(url, stream=True)
response.raise_for_status()
# Read content into memory
zip_content = response.content
version = hashlib.sha256(zip_content).hexdigest()
logger.info(f"O*NET database version (sha256): {version}")
db_path = run.cache_dir / f"onet_{version}.db"
if db_path.exists():
logger.info(f"Using cached O*NET database: {db_path}")
conn = sqlite3.connect(db_path)
# Set PRAGMA for foreign keys on every connection
conn.execute("PRAGMA foreign_keys = ON;")
return conn, version
logger.info(f"Creating new O*NET database: {db_path}")
conn = sqlite3.connect(db_path)
# Set performance PRAGMAs for fast import
logger.info("Creating new SQLite database with performance settings")
conn.executescript("""
PRAGMA journal_mode = OFF;
PRAGMA synchronous = 0;
PRAGMA cache_size = 1000000;
PRAGMA locking_mode = EXCLUSIVE;
PRAGMA temp_store = MEMORY;
PRAGMA foreign_keys = ON;
""")
with zipfile.ZipFile(io.BytesIO(zip_content)) as z:
sql_scripts = []
for filename in sorted(z.namelist()):
if filename.endswith(".sql"):
sql_scripts.append(z.read(filename).decode('utf-8'))
if not sql_scripts:
raise RuntimeError("No SQL files found in the O*NET zip archive.")
# Combine and execute all SQL files in one transaction
full_script = "BEGIN TRANSACTION;\n" + "\n".join(sql_scripts) + "\nCOMMIT;"
logger.info("Executing SQL files in alphabetical order (single transaction mode)")
conn.executescript(full_script)
logger.info("Database populated successfully. Restoring reliability settings...")
# Restore reliability-focused settings after import
conn.executescript("""
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA locking_mode = NORMAL;
PRAGMA temp_store = DEFAULT;
PRAGMA foreign_keys = ON;
PRAGMA optimize;
""")
conn.execute("VACUUM;")
conn.commit()
logger.info("Reliability settings restored and database optimized successfully!")
return conn, version
def fetch_oesm_data(run: Run) -> Tuple[pd.DataFrame, str]:
"""
Downloads the OESM national data from the BLS website.
The version is the sha256 of the downloaded zip file.
"""
url = "https://www.bls.gov/oes/special-requests/oesm23nat.zip"
logger.info(f"Downloading OESM data from {url}")
response = requests.get(url)
response.raise_for_status()
zip_content = response.content
version = hashlib.sha256(zip_content).hexdigest()
logger.info(f"OESM data version (sha256): {version}")
parquet_path = run.cache_dir / f"oesm_{version}.parquet"
if parquet_path.exists():
logger.info(f"Using cached OESM data: {parquet_path}")
return pd.read_parquet(parquet_path), version
logger.info(f"Creating new OESM data cache: {parquet_path}")
with zipfile.ZipFile(io.BytesIO(zip_content)) as z:
# Find the excel file in the zip
excel_filename = None
for filename in z.namelist():
logger.debug(f"Found file in OESM zip: {filename}")
if filename.lower().endswith(".xlsx"):
excel_filename = filename
break
if excel_filename is None:
raise FileNotFoundError("Could not find the Excel file in the OESM zip archive.")
logger.info(f"Reading {excel_filename} from zip archive.")
with z.open(excel_filename) as f:
df = pd.read_excel(f, engine='openpyxl')
df.to_parquet(parquet_path)
logger.info(f"Saved OESM data to cache: {parquet_path}")
return df, version
def fetch_epoch_remote_data(run: Run) -> Tuple[pd.DataFrame, str]:
"""
Downloads the EPOCH AI remote work task data.
The version is the sha256 of the downloaded CSV file.
"""
# This is the direct download link constructed from the Google Drive share link
url = "https://drive.google.com/uc?export=download&id=1GrHhuYIgaCCgo99dZ_40BWraz-fzo76r"
logger.info(f"Downloading EPOCH remote data from Google Drive: {url}")
# Need to handle potential cookies/redirects from Google Drive
session = requests.Session()
response = session.get(url, stream=True)
response.raise_for_status()
csv_content = response.content
version = hashlib.sha256(csv_content).hexdigest()
logger.info(f"EPOCH remote data version (sha256): {version}")
parquet_path = run.cache_dir / f"epoch_remote_{version}.parquet"
if parquet_path.exists():
logger.info(f"Using cached EPOCH remote data: {parquet_path}")
return pd.read_parquet(parquet_path), version
logger.info(f"Creating new EPOCH remote data cache: {parquet_path}")
df = pd.read_csv(io.BytesIO(csv_content))
df.to_parquet(parquet_path)
logger.info(f"Saved EPOCH remote data to cache: {parquet_path}")
return df, version