sprint-econtai/analysis/data.py
Félix Dorn 43076bcbb1 old
2025-07-15 00:41:05 +02:00

207 lines
8 KiB
Python

import logging
import re
import requests
import shutil
import sqlite3
import zipfile
from pathlib import Path
# Configure logging to provide feedback during the data setup process
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Constants ---
# Using a data directory at the root of the project
DATA_DIR = Path("data")
# O*NET database details. We download the MySQL version and convert it to SQLite.
ONET_MYSQL_URL = "https://www.onetcenter.org/dl_files/database/db_29_3_mysql.zip"
DB_ZIP_PATH = DATA_DIR / "onet_mysql.zip"
DB_FILE_PATH = DATA_DIR / "onet.db"
EXTRACT_DIR = DATA_DIR / "onet_mysql_extracted"
# URLs for other required data files are in a separate text data archive.
ONET_TEXT_URL = "https://www.onetcenter.org/dl_files/database/db_29_3_text.zip"
TEXT_ZIP_PATH = DATA_DIR / "onet_text.zip"
TASK_RATINGS_PATH = DATA_DIR / "Task Ratings.txt"
DWA_REFERENCE_PATH = DATA_DIR / "DWA Reference.txt"
def setup_data_and_database():
"""
Main function to orchestrate the data setup.
It ensures the data directory exists, then downloads and sets up the O*NET database
and any other required data files.
"""
logging.info("Starting data and database setup...")
DATA_DIR.mkdir(exist_ok=True)
_setup_onet_database()
_download_additional_data()
logging.info("Data and database setup complete.")
def _setup_onet_database():
"""
Downloads the O*NET MySQL database, extracts it, and imports it into a
new SQLite database, following performance best practices from a shell script.
This method performs minimal text-based conversion of the MySQL dump to
make it compatible with SQLite before importing.
"""
if DB_FILE_PATH.exists():
logging.info("O*NET database already exists at %s. Skipping setup.", DB_FILE_PATH)
return
logging.info("O*NET database not found. Starting fresh setup.")
# Ensure the extraction directory is clean before use
if EXTRACT_DIR.exists():
shutil.rmtree(EXTRACT_DIR)
EXTRACT_DIR.mkdir()
try:
# 1. Download if necessary
if not DB_ZIP_PATH.exists():
logging.info("Downloading O*NET database from %s", ONET_MYSQL_URL)
_download_file(ONET_MYSQL_URL, DB_ZIP_PATH)
else:
logging.info("Using existing O*NET zip file at %s", DB_ZIP_PATH)
# 2. Extract
logging.info("Extracting O*NET database files to %s", EXTRACT_DIR)
with zipfile.ZipFile(DB_ZIP_PATH, 'r') as zip_ref:
zip_ref.extractall(EXTRACT_DIR)
# 3. Create new DB with performance PRAGMAs
logging.info("Creating new SQLite database with performance settings: %s", DB_FILE_PATH)
conn = sqlite3.connect(DB_FILE_PATH)
conn.executescript("""
PRAGMA journal_mode = OFF;
PRAGMA synchronous = 0;
PRAGMA cache_size = 1000000;
PRAGMA locking_mode = EXCLUSIVE;
PRAGMA temp_store = MEMORY;
""")
conn.close()
# 4. Combine all SQL files, convert, and import in a single transaction
logging.info("Combining and converting SQL files for single transaction import...")
sql_files = sorted(EXTRACT_DIR.rglob('*.sql'))
if not sql_files:
raise FileNotFoundError(f"No SQL files found in {EXTRACT_DIR}")
# Concatenate all files into one string
mysql_dump = "\n".join([sql_file.read_text(encoding='utf-8') for sql_file in sql_files])
# Minimal conversion for SQLite: remove backticks and ENGINE clauses
sqlite_dump = mysql_dump.replace('`', '')
sqlite_dump = re.sub(r'\) ENGINE=InnoDB.*?;', ');', sqlite_dump, flags=re.DOTALL)
full_script = f"BEGIN TRANSACTION;\n{sqlite_dump}\nCOMMIT;"
logging.info(f"Importing {len(sql_files)} SQL files into database...")
conn = sqlite3.connect(DB_FILE_PATH)
conn.executescript(full_script)
conn.close()
logging.info("Database populated successfully.")
# 5. Restore reliability settings and optimize
logging.info("Restoring reliability settings and optimizing database...")
conn = sqlite3.connect(DB_FILE_PATH)
conn.executescript("""
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA locking_mode = NORMAL;
PRAGMA temp_store = DEFAULT;
PRAGMA foreign_keys = ON;
PRAGMA optimize;
""")
conn.execute("VACUUM;")
conn.close()
logging.info("Database setup and optimization complete.")
except Exception as e:
logging.error("Failed during database setup: %s", e, exc_info=True)
if DB_FILE_PATH.exists():
DB_FILE_PATH.unlink()
raise
finally:
# 6. Cleanup
logging.info("Cleaning up temporary files...")
if DB_ZIP_PATH.exists():
DB_ZIP_PATH.unlink()
if EXTRACT_DIR.exists():
shutil.rmtree(EXTRACT_DIR)
def _download_additional_data():
"""
Downloads and extracts supplementary data files from the O*NET text archive.
If the required text files already exist, this function does nothing.
"""
required_files = [TASK_RATINGS_PATH, DWA_REFERENCE_PATH]
if all(p.exists() for p in required_files):
logging.info("All required text data files already exist. Skipping download.")
return
logging.info("One or more text data files are missing. Downloading and extracting from archive...")
try:
_download_file(ONET_TEXT_URL, TEXT_ZIP_PATH)
logging.info("Unzipping text data archive...")
with zipfile.ZipFile(TEXT_ZIP_PATH, 'r') as zip_ref:
# Extract only the files we need, without creating subdirectories
for target_path in required_files:
if not target_path.exists():
# Find the corresponding file within the zip archive's directory structure
member_name = next((m for m in zip_ref.namelist() if m.endswith(target_path.name)), None)
if member_name:
with zip_ref.open(member_name) as source, open(target_path, 'wb') as target:
target.write(source.read())
logging.info("Extracted %s", target_path.name)
else:
logging.warning("Could not find %s in the text data archive.", target_path.name)
except requests.exceptions.RequestException as e:
logging.error("Failed to download O*NET text data archive: %s", e)
raise
except zipfile.BadZipFile as e:
logging.error("Failed to process the text data archive: %s", e)
raise
finally:
# Clean up the downloaded zip file
if TEXT_ZIP_PATH.exists():
TEXT_ZIP_PATH.unlink()
logging.info("Cleaned up downloaded text archive zip file.")
def _download_file(url, destination):
"""
Helper function to download a file from a URL, with streaming for large files.
"""
logging.info("Downloading from %s to %s", url, destination)
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(destination, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
logging.info("Download of %s complete.", destination.name)
def get_db_connection():
"""
Establishes and returns a connection to the SQLite database.
Returns None if the database file does not exist.
"""
if not DB_FILE_PATH.exists():
logging.error("Database file not found at %s. Run the setup process first.", DB_FILE_PATH)
return None
try:
conn = sqlite3.connect(DB_FILE_PATH)
return conn
except sqlite3.Error as e:
logging.error("Failed to connect to the database: %s", e)
return None
if __name__ == '__main__':
# This allows the data setup to be run directly from the command line,
# which is useful for initialization or debugging.
setup_data_and_database()