import pandas as pd import litellm import dotenv import os import time import json import math # Load environment variables dotenv.load_dotenv(override=True) # litellm._turn_on_debug() # Optional debugging # --- Configuration --- MODEL = "gpt-4.1-mini" # Make sure this model supports json_schema or structured output RATE_LIMIT = 5000 # Requests per minute CHUNK_SIZE = 300 # Number of unique tasks per API call SECONDS_PER_MINUTE = 60 # File configuration CLASSIFICATION_FILENAME = "tasks_estimateable.csv" # Output file with classifications TASK_SOURCE_FOR_INIT_FILENAME = "tasks_with_estimates.csv" OUTPUT_COLUMN_NAME = "task_estimateable" SOURCE_FILTER_COLUMN = "remote_status" SOURCE_FILTER_VALUE = "remote" # --- Prompts and Schema --- SYSTEM_PROMPT_CLASSIFY = """ Classify the provided O*NET task into one of these categories: - ATOMIC (schedulable): A single, clearly-bounded activity, typically lasting minutes, hours, or a few days. - ONGOING-CONSTRAINT (background role/ethical rule): A continuous responsibility or behavioural norm with no schedulable duration (e.g., “follow confidentiality rules,” “serve as department head”). """.strip() USER_MESSAGE_TEMPLATE_CLASSIFY = "Task: {task}" CLASSIFICATION_CATEGORIES = ["ATOMIC", "ONGOING-CONSTRAINT"] SCHEMA_FOR_CLASSIFICATION = { "name": "classify_task_type", "strict": True, "schema": { "type": "object", "properties": { "task_category": { "type": "string", "enum": CLASSIFICATION_CATEGORIES, "description": "The classification of the task (ATOMIC or ONGOING-CONSTRAINT).", } }, "required": ["task_category"], "additionalProperties": False, }, } def save_dataframe(df_to_save, filename): """Saves the DataFrame to the specified CSV file using atomic write.""" try: temp_filename = filename + ".tmp" df_to_save.to_csv(temp_filename, encoding="utf-8-sig", index=False) os.replace(temp_filename, filename) except Exception as e: print(f"--- Error saving DataFrame to {filename}: {e} ---") if os.path.exists(temp_filename): try: os.remove(temp_filename) except Exception as remove_err: print( f"--- Error removing temporary save file {temp_filename}: {remove_err} ---" ) # --- Load or Initialize DataFrame --- try: if os.path.exists(CLASSIFICATION_FILENAME): df = pd.read_csv(CLASSIFICATION_FILENAME, encoding="utf-8-sig") print(f"Successfully read {len(df)} rows from {CLASSIFICATION_FILENAME}.") save_needed_after_load = False if OUTPUT_COLUMN_NAME not in df.columns: df[OUTPUT_COLUMN_NAME] = pd.NA print(f"Added '{OUTPUT_COLUMN_NAME}' column.") save_needed_after_load = True df[OUTPUT_COLUMN_NAME].replace(["", None, ""], pd.NA, inplace=True) if df[OUTPUT_COLUMN_NAME].dtype != object and not isinstance( df[OUTPUT_COLUMN_NAME].dtype, pd.StringDtype ): try: df[OUTPUT_COLUMN_NAME] = df[OUTPUT_COLUMN_NAME].astype(object) print( f"Corrected dtype of '{OUTPUT_COLUMN_NAME}' to {df[OUTPUT_COLUMN_NAME].dtype}." ) save_needed_after_load = True except Exception as e: print( f"Warning: Could not convert column '{OUTPUT_COLUMN_NAME}' to object: {e}." ) if "task" not in df.columns: print( f"Error: {CLASSIFICATION_FILENAME} must contain a 'task' column for processing." ) exit() if save_needed_after_load: print(f"Saving {CLASSIFICATION_FILENAME} after adding/adjusting column.") save_dataframe(df, CLASSIFICATION_FILENAME) else: print( f"{CLASSIFICATION_FILENAME} not found. Attempting to create it from {TASK_SOURCE_FOR_INIT_FILENAME}." ) if not os.path.exists(TASK_SOURCE_FOR_INIT_FILENAME): print( f"Error: Source file {TASK_SOURCE_FOR_INIT_FILENAME} not found. Cannot create {CLASSIFICATION_FILENAME}." ) exit() df_source = pd.read_csv(TASK_SOURCE_FOR_INIT_FILENAME, encoding="utf-8-sig") required_source_cols_for_init = ["task", SOURCE_FILTER_COLUMN] missing_source_cols = [ col for col in required_source_cols_for_init if col not in df_source.columns ] if missing_source_cols: print( f"Error: Source file {TASK_SOURCE_FOR_INIT_FILENAME} is missing required columns for initialization: {', '.join(missing_source_cols)}." ) exit() df_source_filtered = df_source[ df_source[SOURCE_FILTER_COLUMN] == SOURCE_FILTER_VALUE ].copy() if df_source_filtered.empty: print( f"Warning: No tasks with '{SOURCE_FILTER_COLUMN}' == '{SOURCE_FILTER_VALUE}' found in {TASK_SOURCE_FOR_INIT_FILENAME}. " f"{CLASSIFICATION_FILENAME} will be created with schema but no tasks to classify initially." ) df = df_source_filtered[["task"]].copy() df[OUTPUT_COLUMN_NAME] = pd.NA df[OUTPUT_COLUMN_NAME] = df[OUTPUT_COLUMN_NAME].astype(object) print( f"Created {CLASSIFICATION_FILENAME} using tasks from {TASK_SOURCE_FOR_INIT_FILENAME} " f"(where {SOURCE_FILTER_COLUMN}='{SOURCE_FILTER_VALUE}'). New file has {len(df)} tasks." ) save_dataframe(df, CLASSIFICATION_FILENAME) except FileNotFoundError: print(f"Error: A required file was not found. Please check paths.") exit() except Exception as e: print(f"Error during DataFrame loading or initialization: {e}") exit() # --- Identify Unique Tasks to Process --- if df.empty: print(f"{CLASSIFICATION_FILENAME} is empty. Nothing to process. Exiting.") exit() initial_unprocessed_mask = df[OUTPUT_COLUMN_NAME].isna() if not initial_unprocessed_mask.any(): print( f"All tasks in {CLASSIFICATION_FILENAME} seem to have been classified already. Exiting." ) exit() # Filter for rows that are unprocessed AND have a valid 'task' string valid_tasks_to_consider_df = df[ initial_unprocessed_mask & df["task"].notna() & (df["task"].str.strip() != "") ] if valid_tasks_to_consider_df.empty: print( f"No valid, unclassified tasks found to process (after filtering out empty/NaN task descriptions). Exiting." ) exit() unique_task_labels_for_api = ( valid_tasks_to_consider_df["task"].drop_duplicates().tolist() ) total_rows_to_update_potentially = len( df[initial_unprocessed_mask] ) # Count all rows that are NA print( f"Found {total_rows_to_update_potentially} total rows in {CLASSIFICATION_FILENAME} needing classification." ) print( f"Identified {len(unique_task_labels_for_api)} unique, valid task labels to send to the API." ) # --- Prepare messages for batch completion (only for unique task labels) --- messages_list = [] print(f"Preparing messages for {len(unique_task_labels_for_api)} unique task labels...") for task_label in unique_task_labels_for_api: # task_label is already guaranteed to be non-empty and not NaN from the filtering above user_message = USER_MESSAGE_TEMPLATE_CLASSIFY.format(task=task_label) messages_for_task = [ {"role": "system", "content": SYSTEM_PROMPT_CLASSIFY}, {"role": "user", "content": user_message}, ] messages_list.append(messages_for_task) print(f"Prepared {len(messages_list)} message sets for batch completion.") if ( not messages_list ): # Should only happen if unique_task_labels_for_api was empty, caught above print( "No messages prepared, though unique tasks were identified. This is unexpected. Exiting." ) exit() # --- Call batch_completion in chunks with rate limiting and periodic saving --- total_unique_tasks_to_send = len( messages_list ) # Same as len(unique_task_labels_for_api) num_chunks = math.ceil(total_unique_tasks_to_send / CHUNK_SIZE) print( f"\nStarting batch classification for {total_unique_tasks_to_send} unique task labels in {num_chunks} chunks..." ) overall_start_time = time.time() processed_rows_count_total = 0 # Counts actual rows updated in the DataFrame for i in range(num_chunks): chunk_start_message_index = i * CHUNK_SIZE chunk_end_message_index = min((i + 1) * CHUNK_SIZE, total_unique_tasks_to_send) message_chunk = messages_list[chunk_start_message_index:chunk_end_message_index] # Get corresponding unique task labels for this chunk chunk_task_labels = unique_task_labels_for_api[ chunk_start_message_index:chunk_end_message_index ] if not message_chunk: # Should not happen if loop range is correct continue print( f"\nProcessing chunk {i + 1}/{num_chunks} (Unique Task Labels {chunk_start_message_index + 1}-{chunk_end_message_index} of this run)..." ) chunk_start_time = time.time() responses = [] try: print( f"Sending {len(message_chunk)} requests (for unique tasks) for chunk {i + 1}..." ) responses = litellm.batch_completion( model=MODEL, messages=message_chunk, response_format={ "type": "json_schema", "json_schema": SCHEMA_FOR_CLASSIFICATION, }, num_retries=3, ) print(f"Chunk {i + 1} API call completed.") except Exception as e: print(f"Error during litellm.batch_completion for chunk {i + 1}: {e}") responses = [None] * len(message_chunk) # --- Process responses for the current chunk --- # chunk_updates stores {task_label: classification_category} chunk_task_classifications = {} successful_api_calls_in_chunk = 0 failed_api_calls_in_chunk = 0 if responses and len(responses) == len(message_chunk): for j, response in enumerate(responses): current_task_label = chunk_task_labels[ j ] # The unique task label for this response content_str = None if response is None: print( f"API call failed for task label '{current_task_label}' (response is None)." ) failed_api_calls_in_chunk += 1 continue try: if ( response.choices and response.choices[0].message and response.choices[0].message.content ): content_str = response.choices[0].message.content classification_data = json.loads(content_str) category_raw = classification_data.get("task_category") if category_raw in CLASSIFICATION_CATEGORIES: successful_api_calls_in_chunk += 1 chunk_task_classifications[current_task_label] = category_raw else: print( f"Warning: Invalid or missing task_category for task label '{current_task_label}': '{category_raw}'. Content: '{content_str}'" ) failed_api_calls_in_chunk += 1 else: finish_reason = ( response.choices[0].finish_reason if (response.choices and response.choices[0].finish_reason) else "unknown" ) error_message = ( response.choices[0].message.content if (response.choices and response.choices[0].message) else "No content in message." ) print( f"Warning: Received non-standard or empty response content for task label '{current_task_label}'. " f"Finish Reason: '{finish_reason}'. Message: '{error_message}'. Raw Choices: {response.choices}" ) failed_api_calls_in_chunk += 1 except json.JSONDecodeError: print( f"Warning: Could not decode JSON for task label '{current_task_label}'. Content received: '{content_str}'" ) failed_api_calls_in_chunk += 1 except AttributeError as ae: print( f"Warning: Missing attribute processing response for task label '{current_task_label}': {ae}. Response: {response}" ) failed_api_calls_in_chunk += 1 except Exception as e: print( f"Warning: Unexpected error processing response for task label '{current_task_label}': {type(e).__name__} - {e}. Response: {response}" ) failed_api_calls_in_chunk += 1 else: print( f"Warning: Mismatch between #responses ({len(responses) if responses else 0}) " f"and #messages sent ({len(message_chunk)}) for chunk {i + 1}, or no responses. Marking all API calls in chunk as failed." ) failed_api_calls_in_chunk = len(message_chunk) # --- Update Main DataFrame and Save Periodically --- rows_updated_this_chunk = 0 if chunk_task_classifications: print( f"Updating main DataFrame with classifications for {len(chunk_task_classifications)} unique tasks from chunk {i + 1}..." ) for task_label, category in chunk_task_classifications.items(): # Update all rows in the main df that match this task_label AND are still NA in the output column update_condition = (df["task"] == task_label) & ( df[OUTPUT_COLUMN_NAME].isna() ) num_rows_for_this_task_label = df[update_condition].shape[0] if num_rows_for_this_task_label > 0: df.loc[update_condition, OUTPUT_COLUMN_NAME] = category rows_updated_this_chunk += num_rows_for_this_task_label print( f"Updated {rows_updated_this_chunk} rows in the DataFrame based on this chunk's API responses." ) print(f"Saving progress to {CLASSIFICATION_FILENAME}...") save_dataframe(df, CLASSIFICATION_FILENAME) else: print( f"No successful API classifications obtained in chunk {i + 1} to update DataFrame or save." ) print( f"Chunk {i + 1} API summary: Successful Calls={successful_api_calls_in_chunk}, Failed/Skipped Calls={failed_api_calls_in_chunk}. " f"Rows updated in DataFrame this chunk: {rows_updated_this_chunk}" ) processed_rows_count_total += rows_updated_this_chunk # --- Rate Limiting Pause --- chunk_end_time = time.time() chunk_duration = chunk_end_time - chunk_start_time print(f"Chunk {i + 1} (API calls and DF update) took {chunk_duration:.2f} seconds.") if i < num_chunks - 1: time_per_request = SECONDS_PER_MINUTE / RATE_LIMIT if RATE_LIMIT > 0 else 0 min_chunk_duration_for_rate = ( len(message_chunk) * time_per_request ) # Based on API calls made pause_needed = max(0, min_chunk_duration_for_rate - chunk_duration) if pause_needed > 0: print( f"Pausing for {pause_needed:.2f} seconds to respect rate limit ({RATE_LIMIT}/min)..." ) time.sleep(pause_needed) overall_end_time = time.time() total_duration_minutes = (overall_end_time - overall_start_time) / 60 print( f"\nBatch classification finished." f" Updated {processed_rows_count_total} rows in '{CLASSIFICATION_FILENAME}' with new classifications in this run." f" Total duration: {total_duration_minutes:.2f} minutes." ) print(f"Performing final save to {CLASSIFICATION_FILENAME}...") save_dataframe(df, CLASSIFICATION_FILENAME) print("\nScript finished.")