import logging import re import requests import shutil import sqlite3 import zipfile from pathlib import Path # Configure logging to provide feedback during the data setup process logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- Constants --- # Using a data directory at the root of the project DATA_DIR = Path("data") # O*NET database details. We download the MySQL version and convert it to SQLite. ONET_MYSQL_URL = "https://www.onetcenter.org/dl_files/database/db_29_3_mysql.zip" DB_ZIP_PATH = DATA_DIR / "onet_mysql.zip" DB_FILE_PATH = DATA_DIR / "onet.db" EXTRACT_DIR = DATA_DIR / "onet_mysql_extracted" # URLs for other required data files are in a separate text data archive. ONET_TEXT_URL = "https://www.onetcenter.org/dl_files/database/db_29_3_text.zip" TEXT_ZIP_PATH = DATA_DIR / "onet_text.zip" TASK_RATINGS_PATH = DATA_DIR / "Task Ratings.txt" DWA_REFERENCE_PATH = DATA_DIR / "DWA Reference.txt" def setup_data_and_database(): """ Main function to orchestrate the data setup. It ensures the data directory exists, then downloads and sets up the O*NET database and any other required data files. """ logging.info("Starting data and database setup...") DATA_DIR.mkdir(exist_ok=True) _setup_onet_database() _download_additional_data() logging.info("Data and database setup complete.") def _setup_onet_database(): """ Downloads the O*NET MySQL database, extracts it, and imports it into a new SQLite database, following performance best practices from a shell script. This method performs minimal text-based conversion of the MySQL dump to make it compatible with SQLite before importing. """ if DB_FILE_PATH.exists(): logging.info("O*NET database already exists at %s. Skipping setup.", DB_FILE_PATH) return logging.info("O*NET database not found. Starting fresh setup.") # Ensure the extraction directory is clean before use if EXTRACT_DIR.exists(): shutil.rmtree(EXTRACT_DIR) EXTRACT_DIR.mkdir() try: # 1. Download if necessary if not DB_ZIP_PATH.exists(): logging.info("Downloading O*NET database from %s", ONET_MYSQL_URL) _download_file(ONET_MYSQL_URL, DB_ZIP_PATH) else: logging.info("Using existing O*NET zip file at %s", DB_ZIP_PATH) # 2. Extract logging.info("Extracting O*NET database files to %s", EXTRACT_DIR) with zipfile.ZipFile(DB_ZIP_PATH, 'r') as zip_ref: zip_ref.extractall(EXTRACT_DIR) # 3. Create new DB with performance PRAGMAs logging.info("Creating new SQLite database with performance settings: %s", DB_FILE_PATH) conn = sqlite3.connect(DB_FILE_PATH) conn.executescript(""" PRAGMA journal_mode = OFF; PRAGMA synchronous = 0; PRAGMA cache_size = 1000000; PRAGMA locking_mode = EXCLUSIVE; PRAGMA temp_store = MEMORY; """) conn.close() # 4. Combine all SQL files, convert, and import in a single transaction logging.info("Combining and converting SQL files for single transaction import...") sql_files = sorted(EXTRACT_DIR.rglob('*.sql')) if not sql_files: raise FileNotFoundError(f"No SQL files found in {EXTRACT_DIR}") # Concatenate all files into one string mysql_dump = "\n".join([sql_file.read_text(encoding='utf-8') for sql_file in sql_files]) # Minimal conversion for SQLite: remove backticks and ENGINE clauses sqlite_dump = mysql_dump.replace('`', '') sqlite_dump = re.sub(r'\) ENGINE=InnoDB.*?;', ');', sqlite_dump, flags=re.DOTALL) full_script = f"BEGIN TRANSACTION;\n{sqlite_dump}\nCOMMIT;" logging.info(f"Importing {len(sql_files)} SQL files into database...") conn = sqlite3.connect(DB_FILE_PATH) conn.executescript(full_script) conn.close() logging.info("Database populated successfully.") # 5. Restore reliability settings and optimize logging.info("Restoring reliability settings and optimizing database...") conn = sqlite3.connect(DB_FILE_PATH) conn.executescript(""" PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL; PRAGMA locking_mode = NORMAL; PRAGMA temp_store = DEFAULT; PRAGMA foreign_keys = ON; PRAGMA optimize; """) conn.execute("VACUUM;") conn.close() logging.info("Database setup and optimization complete.") except Exception as e: logging.error("Failed during database setup: %s", e, exc_info=True) if DB_FILE_PATH.exists(): DB_FILE_PATH.unlink() raise finally: # 6. Cleanup logging.info("Cleaning up temporary files...") if DB_ZIP_PATH.exists(): DB_ZIP_PATH.unlink() if EXTRACT_DIR.exists(): shutil.rmtree(EXTRACT_DIR) def _download_additional_data(): """ Downloads and extracts supplementary data files from the O*NET text archive. If the required text files already exist, this function does nothing. """ required_files = [TASK_RATINGS_PATH, DWA_REFERENCE_PATH] if all(p.exists() for p in required_files): logging.info("All required text data files already exist. Skipping download.") return logging.info("One or more text data files are missing. Downloading and extracting from archive...") try: _download_file(ONET_TEXT_URL, TEXT_ZIP_PATH) logging.info("Unzipping text data archive...") with zipfile.ZipFile(TEXT_ZIP_PATH, 'r') as zip_ref: # Extract only the files we need, without creating subdirectories for target_path in required_files: if not target_path.exists(): # Find the corresponding file within the zip archive's directory structure member_name = next((m for m in zip_ref.namelist() if m.endswith(target_path.name)), None) if member_name: with zip_ref.open(member_name) as source, open(target_path, 'wb') as target: target.write(source.read()) logging.info("Extracted %s", target_path.name) else: logging.warning("Could not find %s in the text data archive.", target_path.name) except requests.exceptions.RequestException as e: logging.error("Failed to download O*NET text data archive: %s", e) raise except zipfile.BadZipFile as e: logging.error("Failed to process the text data archive: %s", e) raise finally: # Clean up the downloaded zip file if TEXT_ZIP_PATH.exists(): TEXT_ZIP_PATH.unlink() logging.info("Cleaned up downloaded text archive zip file.") def _download_file(url, destination): """ Helper function to download a file from a URL, with streaming for large files. """ logging.info("Downloading from %s to %s", url, destination) with requests.get(url, stream=True) as r: r.raise_for_status() with open(destination, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) logging.info("Download of %s complete.", destination.name) def get_db_connection(): """ Establishes and returns a connection to the SQLite database. Returns None if the database file does not exist. """ if not DB_FILE_PATH.exists(): logging.error("Database file not found at %s. Run the setup process first.", DB_FILE_PATH) return None try: conn = sqlite3.connect(DB_FILE_PATH) return conn except sqlite3.Error as e: logging.error("Failed to connect to the database: %s", e) return None if __name__ == '__main__': # This allows the data setup to be run directly from the command line, # which is useful for initialization or debugging. setup_data_and_database()