# Import necessary libraries import pandas as pd import litellm # Ensure this is installed in your environment import dotenv import os import time import json import math import numpy as np # Added for NaN handling # Load environment variables dotenv.load_dotenv(override=True) # --- Configuration --- MODEL = "gpt-4.1-mini" # Consider adjusting RATE_LIMIT based on the specific model's actual limits RATE_LIMIT = 5000 # Max requests per minute # Smaller chunk size results in more frequent saving but potentially slower overall processing CHUNK_SIZE = 10 # Process messages in chunks of this size SECONDS_PER_MINUTE = 60 # **UPDATED:** Filename changed as requested FILENAME = "task_to_estimate.csv" # Use a single filename for in-place updates # --- Prompts and Schema --- SYSTEM_PROMPT = """ You are an expert assistant evaluating the time required for job tasks. Your goal is to estimate the 'effective time' range needed for a skilled human to complete the following job task **remotely**, without supervision 'Effective time' is the active, focused work duration required to complete the task. Crucially, **exclude all waiting periods, delays, or time spent on other unrelated activities**. Think of it as the continuous, productive time investment needed if the worker could pause and resume instantly without cost. Provide a lower and upper bound estimate for the 'effective time'. These bounds should capture the time within which approximately 80% of instances of performing this specific task are typically completed by a qualified individual. You MUST output a JSON object containing the lower and upper bound estimates. Select your lower and upper bound estimates **only** from the following discrete durations: ['10 minutes', '30 minutes', '1 hour', '2 hours', '4 hours', '8 hours', '16 hours', '3 days', '1 week', '3 weeks', '6 weeks', '3 months', '6 months', '1 year', '3 years', '10 years'] Example Output Format: { "lower_bound_estimate": "1 hour", "upper_bound_estimate": "4 hours" } Base your estimate on the provided task description, its associated activities, and the occupational context. Only output the JSON object. """.strip() # Modified prompt slightly to emphasize JSON output for response_format mode # Template uses the correct column names based on previous update USER_MESSAGE_TEMPLATE = """ Please estimate the effective time range for the following remote task: **Occupation Category:** {occupation_title} **Occupation Description:** {occupation_description} **Task Description:** {task} **Relevant steps for the task:** {dwas} Consider the complexity and the typical steps involved. Output ONLY the JSON object with keys "lower_bound_estimate" and "upper_bound_estimate". """.strip() # Modified prompt slightly to emphasize JSON output for response_format mode ALLOWED_DURATIONS = [ "10 minutes", "30 minutes", "1 hour", "2 hours", "4 hours", "8 hours", "16 hours", "3 days", "1 week", "3 weeks", "6 weeks", "3 months", "6 months", "1 year", "3 years", "10 years", ] # Schema definition for litellm's response_format validation # **REVERTED:** Using the schema definition compatible with response_format SCHEMA_FOR_VALIDATION = { "name": "get_time_estimate", "strict": True, "schema": { "type": "object", "properties": { "lower_bound_estimate": {"type": "string", "enum": ALLOWED_DURATIONS}, "upper_bound_estimate": {"type": "string", "enum": ALLOWED_DURATIONS}, }, "required": ["lower_bound_estimate", "upper_bound_estimate"], "additionalProperties": False, }, } # --- Function to Save DataFrame In-Place --- def save_dataframe(df_to_save, filename): """Saves the DataFrame to the specified CSV file using atomic write.""" try: # Use a temporary file for atomic write to prevent corruption if script crashes during save temp_filename = filename + ".tmp" df_to_save.to_csv(temp_filename, encoding="utf-8-sig", index=False) os.replace(temp_filename, filename) # Atomic replace # print(f"--- DataFrame successfully saved to {filename} ---") # Optional: uncomment for verbose logging except Exception as e: print(f"--- Error saving DataFrame to {filename}: {e} ---") # Clean up temp file if rename failed if os.path.exists(temp_filename): try: os.remove(temp_filename) except Exception as remove_err: print( f"--- Error removing temporary save file {temp_filename}: {remove_err} ---" ) # --- Main Script Logic --- try: # Read the CSV if os.path.exists(FILENAME): df = pd.read_csv(FILENAME, encoding="utf-8-sig") print(f"Successfully read {len(df)} rows from {FILENAME}.") # Check if estimate columns exist, add them if not, initialized with NaN save_needed = False if "lb_estimate" not in df.columns: df["lb_estimate"] = np.nan print("Added 'lb_estimate' column.") save_needed = True # Ensure column is float/object type to hold NaNs and strings elif not pd.api.types.is_object_dtype( df["lb_estimate"] ) and not pd.api.types.is_float_dtype(df["lb_estimate"]): df["lb_estimate"] = df["lb_estimate"].astype(object) if "ub_estimate" not in df.columns: df["ub_estimate"] = np.nan print("Added 'ub_estimate' column.") save_needed = True elif not pd.api.types.is_object_dtype( df["ub_estimate"] ) and not pd.api.types.is_float_dtype(df["ub_estimate"]): df["ub_estimate"] = df["ub_estimate"].astype(object) # Fill potential empty strings or other placeholders with actual NaN for consistency df["lb_estimate"].replace(["", None], np.nan, inplace=True) df["ub_estimate"].replace(["", None], np.nan, inplace=True) if save_needed: print(f"Saving {FILENAME} after adding missing estimate columns.") save_dataframe(df, FILENAME) else: print(f"Error: {FILENAME} not found. Please ensure the file exists.") exit() except FileNotFoundError: print(f"Error: {FILENAME} not found. Please ensure the file exists.") exit() except Exception as e: print(f"Error reading or initializing {FILENAME}: {e}") exit() # --- Identify Rows to Process --- unprocessed_mask = df["lb_estimate"].isna() start_index = unprocessed_mask.idxmax() # Finds the index of the first True value if unprocessed_mask.any() and pd.isna(df.loc[start_index, "lb_estimate"]): print(f"Resuming processing from index {start_index}.") df_to_process = df.loc[unprocessed_mask].copy() original_indices = df_to_process.index # Keep track of original indices else: print("All rows seem to have estimates already. Exiting.") exit() # --- Prepare messages for batch completion (only for rows needing processing) --- messages_list = [] skipped_rows_indices = [] valid_original_indices = [] if not df_to_process.empty: # Use the correct column names required_cols = ["task", "occupation_title", "occupation_description", "dwas"] print( f"Preparing messages for up to {len(df_to_process)} rows starting from index {start_index}..." ) print(f"Checking for required columns: {required_cols}") for index, row in df_to_process.iterrows(): missing_or_empty = [] for col in required_cols: if col not in row or pd.isna(row[col]) or str(row[col]).strip() == "": missing_or_empty.append(col) if missing_or_empty: print( f"Warning: Skipping row index {index} due to missing/empty required data in columns: {', '.join(missing_or_empty)}." ) skipped_rows_indices.append(index) continue # Format user message using the template with correct column names try: user_message = USER_MESSAGE_TEMPLATE.format( task=row["task"], occupation_title=row["occupation_title"], occupation_description=row["occupation_description"], dwas=row["dwas"], ) except KeyError as e: print( f"Error: Skipping row index {index} due to formatting error - missing key: {e}. Check USER_MESSAGE_TEMPLATE and CSV columns." ) skipped_rows_indices.append(index) continue messages_for_row = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_message}, ] messages_list.append(messages_for_row) valid_original_indices.append(index) print( f"Prepared {len(messages_list)} valid message sets for batch completion (skipped {len(skipped_rows_indices)} rows)." ) if not messages_list: print("No valid rows found to process after checking required data. Exiting.") exit() else: print("No rows found needing processing.") exit() # --- Call batch_completion in chunks with rate limiting and periodic saving --- total_messages_to_send = len(messages_list) num_chunks = math.ceil(total_messages_to_send / CHUNK_SIZE) print( f"\nStarting batch completion for {total_messages_to_send} items in {num_chunks} chunks..." ) overall_start_time = time.time() processed_count_total = 0 for i in range(num_chunks): chunk_start_message_index = i * CHUNK_SIZE chunk_end_message_index = min((i + 1) * CHUNK_SIZE, total_messages_to_send) message_chunk = messages_list[chunk_start_message_index:chunk_end_message_index] chunk_original_indices = valid_original_indices[ chunk_start_message_index:chunk_end_message_index ] if not message_chunk: continue min_idx = min(chunk_original_indices) if chunk_original_indices else "N/A" max_idx = max(chunk_original_indices) if chunk_original_indices else "N/A" print( f"\nProcessing chunk {i + 1}/{num_chunks} (Messages {chunk_start_message_index + 1}-{chunk_end_message_index} of this run)..." f" Corresponding to original indices: {min_idx} - {max_idx}" ) chunk_start_time = time.time() responses = [] try: print(f"Sending {len(message_chunk)} requests for chunk {i + 1}...") # **REVERTED:** Using response_format with json_schema responses = litellm.batch_completion( model=MODEL, messages=message_chunk, response_format={ "type": "json_schema", "json_schema": SCHEMA_FOR_VALIDATION, }, num_retries=3, # request_timeout=60 # Optional: uncomment if needed ) print(f"Chunk {i + 1} API call completed.") except Exception as e: print(f"Error during litellm.batch_completion for chunk {i + 1}: {e}") responses = [None] * len(message_chunk) # --- Process responses for the current chunk --- chunk_lb_estimates = {} chunk_ub_estimates = {} successful_in_chunk = 0 failed_in_chunk = 0 if responses and len(responses) == len(message_chunk): for j, response in enumerate(responses): original_df_index = chunk_original_indices[j] lb_estimate = None ub_estimate = None content_str = None # Initialize for potential error logging if response is None: print( f"Skipping processing for original index {original_df_index} due to API call failure for this item/chunk." ) failed_in_chunk += 1 continue try: # **REVERTED:** Check for content in the message, not tool_calls if ( response.choices and response.choices[0].message and response.choices[0].message.content # Check if content exists ): content_str = response.choices[0].message.content # Attempt to parse the JSON string content estimate_data = json.loads(content_str) lb_estimate = estimate_data.get("lower_bound_estimate") ub_estimate = estimate_data.get("upper_bound_estimate") # Validate against allowed durations if ( lb_estimate in ALLOWED_DURATIONS and ub_estimate in ALLOWED_DURATIONS ): successful_in_chunk += 1 else: print( f"Warning: Invalid duration value(s) in JSON for original index {original_df_index}. LB: '{lb_estimate}', UB: '{ub_estimate}'. Setting to None." ) lb_estimate = None ub_estimate = None failed_in_chunk += 1 else: # Handle cases where the response structure is unexpected or indicates an error finish_reason = ( response.choices[0].finish_reason if (response.choices and response.choices[0].finish_reason) else "unknown" ) print( f"Warning: Received non-standard or empty response content for original index {original_df_index}. " f"Finish Reason: '{finish_reason}'. Raw Response Choices: {response.choices}" ) failed_in_chunk += 1 except json.JSONDecodeError: # Log content_str which failed parsing print( f"Warning: Could not decode JSON for original index {original_df_index}. Content received: '{content_str}'" ) failed_in_chunk += 1 except AttributeError as ae: print( f"Warning: Missing expected attribute processing response for original index {original_df_index}: {ae}. Response: {response}" ) failed_in_chunk += 1 except Exception as e: print( f"Warning: An unexpected error occurred processing response for original index {original_df_index}: {type(e).__name__} - {e}. Response: {response}" ) failed_in_chunk += 1 # Store successfully parsed results if lb_estimate is not None: chunk_lb_estimates[original_df_index] = lb_estimate if ub_estimate is not None: chunk_ub_estimates[original_df_index] = ub_estimate else: print( f"Warning: Mismatch between number of responses ({len(responses) if responses else 0}) " f"and messages sent ({len(message_chunk)}) for chunk {i + 1}. Marking all as failed." ) failed_in_chunk = len(message_chunk) print( f"Chunk {i + 1} processing summary: Success={successful_in_chunk}, Failed/Skipped={failed_in_chunk}" ) processed_count_total += successful_in_chunk # --- Update Main DataFrame and Save Periodically --- if chunk_lb_estimates or chunk_ub_estimates: print( f"Updating main DataFrame with {len(chunk_lb_estimates)} LB and {len(chunk_ub_estimates)} UB estimates for chunk {i + 1}..." ) if not pd.api.types.is_object_dtype(df["lb_estimate"]): df["lb_estimate"] = df["lb_estimate"].astype(object) if not pd.api.types.is_object_dtype(df["ub_estimate"]): df["ub_estimate"] = df["ub_estimate"].astype(object) for idx, lb in chunk_lb_estimates.items(): if idx in df.index: df.loc[idx, "lb_estimate"] = lb for idx, ub in chunk_ub_estimates.items(): if idx in df.index: df.loc[idx, "ub_estimate"] = ub print(f"Saving progress to {FILENAME}...") save_dataframe(df, FILENAME) else: print(f"No successful estimates obtained in chunk {i + 1} to save.") # --- Rate Limiting Pause --- chunk_end_time = time.time() chunk_duration = chunk_end_time - chunk_start_time print(f"Chunk {i + 1} took {chunk_duration:.2f} seconds.") if i < num_chunks - 1: time_per_request = SECONDS_PER_MINUTE / RATE_LIMIT if RATE_LIMIT > 0 else 0 min_chunk_duration_for_rate = len(message_chunk) * time_per_request pause_needed = max(0, min_chunk_duration_for_rate - chunk_duration) if pause_needed > 0: print( f"Pausing for {pause_needed:.2f} seconds to respect rate limit ({RATE_LIMIT}/min)..." ) time.sleep(pause_needed) overall_end_time = time.time() total_duration_minutes = (overall_end_time - overall_start_time) / 60 print( f"\nBatch completion finished." f" Processed {processed_count_total} new estimates in this run in {total_duration_minutes:.2f} minutes." ) print(f"Performing final save check to {FILENAME}...") save_dataframe(df, FILENAME) print("\nScript finished.")