import pandas as pd import litellm import dotenv import os import time import json import math import numpy as np # --- Configuration --- MODEL = "gpt-4.1-mini" # Make sure this model supports json_schema or structured output RATE_LIMIT = 5000 # Requests per minute CHUNK_SIZE = 300 SECONDS_PER_MINUTE = 60 FILENAME = ( "tasks_with_estimates.csv" # This CSV should contain the tasks to be processed ) # --- Prompts and Schema --- SYSTEM_PROMPT = """ You are an expert assistant evaluating the time to completion required for job tasks. Your goal is to estimate the time range needed for a skilled human to complete the following job task remotely, without supervision. Provide a lower and upper bound estimate for the time to completion time. These bounds should capture the time within which approximately 80% of instances of performing this specific task are typically completed by a qualified individual. Base your estimate on the provided task description, its associated activities, and the occupational context. Your estimate must be in one the allowed units: minute, hour, day, week, month, trimester, semester, year. """.strip() USER_MESSAGE_TEMPLATE = """ Please estimate the time range for the following remote task: **Task Description:** {task} **Relevant activies for the task:** {dwas} **Occupation Category:** {occupation_title} **Occupation Description:** {occupation_description} Consider the complexity and the typical steps involved. """.strip() ALLOWED_UNITS = [ "minute", "hour", "day", "week", "month", "trimester", "semester", "year", ] SCHEMA_FOR_VALIDATION = { "name": "estimate_time", "strict": True, # Enforce schema adherence "schema": { "type": "object", "properties": { "lower_bound_estimate": { "type": "object", "properties": { "quantity": { "type": "number", "description": "The numerical value for the lower bound of the estimate.", }, "unit": { "type": "string", "enum": ALLOWED_UNITS, "description": "The unit of time for the lower bound.", }, }, "required": ["quantity", "unit"], "additionalProperties": False, }, "upper_bound_estimate": { "type": "object", "properties": { "quantity": { "type": "number", "description": "The numerical value for the upper bound of the estimate.", }, "unit": { "type": "string", "enum": ALLOWED_UNITS, "description": "The unit of time for the upper bound.", }, }, "required": ["quantity", "unit"], "additionalProperties": False, }, }, "required": ["lower_bound_estimate", "upper_bound_estimate"], "additionalProperties": False, }, } def save_dataframe(df_to_save, filename): """Saves the DataFrame to the specified CSV file using atomic write.""" try: temp_filename = filename + ".tmp" df_to_save.to_csv(temp_filename, encoding="utf-8-sig", index=False) os.replace(temp_filename, filename) except Exception as e: print(f"--- Error saving DataFrame to {filename}: {e} ---") if os.path.exists(temp_filename): try: os.remove(temp_filename) except Exception as remove_err: print( f"--- Error removing temporary save file {temp_filename}: {remove_err} ---" ) def create_task_estimates(): try: # Read the CSV if os.path.exists(FILENAME): df = pd.read_csv(FILENAME, encoding="utf-8-sig") print(f"Successfully read {len(df)} rows from {FILENAME}.") estimate_columns_spec = { "lb_estimate_qty": float, "lb_estimate_unit": object, "ub_estimate_qty": float, "ub_estimate_unit": object, } save_needed = False for col_name, target_dtype in estimate_columns_spec.items(): if col_name not in df.columns: # Initialize with a type-compatible missing value if target_dtype == float: df[col_name] = np.nan else: # object df[col_name] = pd.NA df[col_name] = df[col_name].astype(target_dtype) # Enforce dtype print(f"Added '{col_name}' column as {df[col_name].dtype}.") save_needed = True else: # Column exists, ensure correct dtype current_pd_dtype = df[col_name].dtype expected_pd_dtype = pd.Series(dtype=target_dtype).dtype if current_pd_dtype != expected_pd_dtype: try: if target_dtype == float: df[col_name] = pd.to_numeric(df[col_name], errors="coerce") else: # object df[col_name] = df[col_name].astype(object) print( f"Corrected dtype of '{col_name}' to {df[col_name].dtype}." ) save_needed = True except Exception as e: print( f"Warning: Could not convert column '{col_name}' to {target_dtype}: {e}. Current dtype: {current_pd_dtype}" ) # Standardize missing values (e.g., empty strings to NA/NaN) # Replace common missing placeholders with pd.NA first df[col_name].replace(["", None, ""], pd.NA, inplace=True) if target_dtype == float: # For float columns, ensure they are numeric and use np.nan after replacement df[col_name] = pd.to_numeric(df[col_name], errors="coerce") if save_needed: print(f"Saving {FILENAME} after adding/adjusting estimate columns.") save_dataframe(df, FILENAME) else: print( f"Error: {FILENAME} not found. Please ensure the file exists and contains task data." ) exit() except FileNotFoundError: print( f"Error: {FILENAME} not found. Please ensure the file exists and contains task data." ) exit() except Exception as e: print(f"Error reading or initializing {FILENAME}: {e}") exit() # --- Identify Rows to Process --- # We'll check for NaN in one of the primary quantity columns. unprocessed_mask = df["lb_estimate_qty"].isna() if unprocessed_mask.any(): start_index = unprocessed_mask.idxmax() # Finds the index of the first True value print(f"Resuming processing. First unprocessed row found at index {start_index}.") df_to_process = df.loc[unprocessed_mask].copy() original_indices = df_to_process.index # Keep track of original indices else: print( "All rows seem to have estimates already (based on 'lb_estimate_qty'). Exiting." ) exit() # --- Prepare messages for batch completion (only for rows needing processing) --- messages_list = [] skipped_rows_indices = [] valid_original_indices = [] if not df_to_process.empty: required_cols = ["task", "occupation_title", "occupation_description", "dwas"] print( f"Preparing messages for up to {len(df_to_process)} rows starting from original index {original_indices[0] if len(original_indices) > 0 else 'N/A'}..." ) print(f"Checking for required columns: {required_cols}") for index, row in df_to_process.iterrows(): missing_or_empty = [] for col in required_cols: if col not in row or pd.isna(row[col]) or str(row[col]).strip() == "": missing_or_empty.append(col) if missing_or_empty: print( f"Warning: Skipping row original index {index} due to missing/empty required data in columns: {', '.join(missing_or_empty)}." ) skipped_rows_indices.append(index) continue try: user_message = USER_MESSAGE_TEMPLATE.format( task=row["task"], occupation_title=row["occupation_title"], occupation_description=row["occupation_description"], dwas=row["dwas"], ) except KeyError as e: print( f"Error: Skipping row original index {index} due to formatting error - missing key: {e}. Check USER_MESSAGE_TEMPLATE and CSV columns." ) skipped_rows_indices.append(index) continue messages_for_row = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_message}, ] messages_list.append(messages_for_row) valid_original_indices.append(index) # This is the original DataFrame index print( f"Prepared {len(messages_list)} valid message sets for batch completion (skipped {len(skipped_rows_indices)} rows)." ) if not messages_list: print("No valid rows found to process after checking required data. Exiting.") exit() else: print( "No rows found needing processing (df_to_process is empty)." ) # Should have been caught by earlier check exit() # --- Call batch_completion in chunks with rate limiting and periodic saving --- total_messages_to_send = len(messages_list) num_chunks = math.ceil(total_messages_to_send / CHUNK_SIZE) print( f"\nStarting batch completion for {total_messages_to_send} items in {num_chunks} chunks..." ) overall_start_time = time.time() processed_count_total = 0 for i in range(num_chunks): chunk_start_message_index = i * CHUNK_SIZE chunk_end_message_index = min((i + 1) * CHUNK_SIZE, total_messages_to_send) message_chunk = messages_list[chunk_start_message_index:chunk_end_message_index] # Get corresponding original DataFrame indices for this chunk chunk_original_indices = valid_original_indices[ chunk_start_message_index:chunk_end_message_index ] if not message_chunk: continue min_idx_disp = min(chunk_original_indices) if chunk_original_indices else "N/A" max_idx_disp = max(chunk_original_indices) if chunk_original_indices else "N/A" print( f"\nProcessing chunk {i + 1}/{num_chunks} (Messages {chunk_start_message_index + 1}-{chunk_end_message_index} of this run)..." f" Corresponding to original indices: {min_idx_disp} - {max_idx_disp}" ) chunk_start_time = time.time() responses = [] try: print(f"Sending {len(message_chunk)} requests for chunk {i + 1}...") responses = litellm.batch_completion( model=MODEL, messages=message_chunk, response_format={ "type": "json_schema", "json_schema": SCHEMA_FOR_VALIDATION, }, num_retries=3, # request_timeout=60 # Optional: uncomment if needed ) print(f"Chunk {i + 1} API call completed.") except Exception as e: print(f"Error during litellm.batch_completion for chunk {i + 1}: {e}") responses = [None] * len( message_chunk ) # Ensure responses list matches message_chunk length for processing loop # --- Process responses for the current chunk --- chunk_updates = {} # To store {original_df_index: {qty/unit data}} successful_in_chunk = 0 failed_in_chunk = 0 if responses and len(responses) == len(message_chunk): for j, response in enumerate(responses): original_df_index = chunk_original_indices[j] # Initialize values for this item lb_qty_val, lb_unit_val, ub_qty_val, ub_unit_val = None, None, None, None content_str = None if response is None: print( f"Skipping processing for original index {original_df_index} due to API call failure for this item (response is None)." ) failed_in_chunk += 1 continue try: if ( response.choices and response.choices[0].message and response.choices[0].message.content ): content_str = response.choices[0].message.content estimate_data = json.loads(content_str) # Can raise JSONDecodeError lower_bound_dict = estimate_data.get("lower_bound_estimate") upper_bound_dict = estimate_data.get("upper_bound_estimate") valid_response_structure = isinstance( lower_bound_dict, dict ) and isinstance(upper_bound_dict, dict) if valid_response_structure: lb_qty_raw = lower_bound_dict.get("quantity") lb_unit_raw = lower_bound_dict.get("unit") ub_qty_raw = upper_bound_dict.get("quantity") ub_unit_raw = upper_bound_dict.get("unit") is_valid_item = True # Validate LB Qty if ( not isinstance(lb_qty_raw, (int, float)) or math.isnan(float(lb_qty_raw)) or float(lb_qty_raw) < 0 ): print( f"Warning: Invalid lb_quantity for original index {original_df_index}: {lb_qty_raw}" ) is_valid_item = False else: lb_qty_val = float(lb_qty_raw) # Validate UB Qty if ( not isinstance(ub_qty_raw, (int, float)) or math.isnan(float(ub_qty_raw)) or float(ub_qty_raw) < 0 ): print( f"Warning: Invalid ub_quantity for original index {original_df_index}: {ub_qty_raw}" ) is_valid_item = False else: ub_qty_val = float(ub_qty_raw) # Validate Units if lb_unit_raw not in ALLOWED_UNITS: print( f"Warning: Invalid lb_unit for original index {original_df_index}: '{lb_unit_raw}'" ) is_valid_item = False else: lb_unit_val = lb_unit_raw if ub_unit_raw not in ALLOWED_UNITS: print( f"Warning: Invalid ub_unit for original index {original_df_index}: '{ub_unit_raw}'" ) is_valid_item = False else: ub_unit_val = ub_unit_raw if is_valid_item: successful_in_chunk += 1 chunk_updates[original_df_index] = { "lb_estimate_qty": lb_qty_val, "lb_estimate_unit": lb_unit_val, "ub_estimate_qty": ub_qty_val, "ub_estimate_unit": ub_unit_val, } else: failed_in_chunk += ( 1 # Values remain None if not fully valid ) else: print( f"Warning: Missing or malformed estimate dicts in JSON for original index {original_df_index}. Content: '{content_str}'" ) failed_in_chunk += 1 else: finish_reason = ( response.choices[0].finish_reason if (response.choices and response.choices[0].finish_reason) else "unknown" ) error_message = ( response.choices[0].message.content if ( response.choices and response.choices[0].message and response.choices[0].message.content ) else "No content in message." ) print( f"Warning: Received non-standard or empty response content for original index {original_df_index}. " f"Finish Reason: '{finish_reason}'. Message: '{error_message}'. Raw Choices: {response.choices}" ) failed_in_chunk += 1 except json.JSONDecodeError: print( f"Warning: Could not decode JSON for original index {original_df_index}. Content received: '{content_str}'" ) failed_in_chunk += 1 except AttributeError as ae: print( f"Warning: Missing expected attribute processing response for original index {original_df_index}: {ae}. Response: {response}" ) failed_in_chunk += 1 except Exception as e: print( f"Warning: An unexpected error occurred processing response for original index {original_df_index}: {type(e).__name__} - {e}. Response: {response}" ) failed_in_chunk += 1 else: print( f"Warning: Mismatch between number of responses ({len(responses) if responses else 0}) " f"and messages sent ({len(message_chunk)}) for chunk {i + 1}, or no responses. Marking all as failed." ) failed_in_chunk = len( message_chunk ) # All items in this chunk are considered failed if response array is problematic print( f"Chunk {i + 1} processing summary: Success={successful_in_chunk}, Failed/Skipped={failed_in_chunk}" ) processed_count_total += successful_in_chunk # --- Update Main DataFrame and Save Periodically --- if chunk_updates: print( f"Updating main DataFrame with {len(chunk_updates)} new estimates for chunk {i + 1}..." ) for idx, estimates in chunk_updates.items(): if idx in df.index: df.loc[idx, "lb_estimate_qty"] = estimates["lb_estimate_qty"] df.loc[idx, "lb_estimate_unit"] = estimates["lb_estimate_unit"] df.loc[idx, "ub_estimate_qty"] = estimates["ub_estimate_qty"] df.loc[idx, "ub_estimate_unit"] = estimates["ub_estimate_unit"] print(f"Saving progress to {FILENAME}...") save_dataframe(df, FILENAME) else: print(f"No successful estimates obtained in chunk {i + 1} to save.") # --- Rate Limiting Pause --- chunk_end_time = time.time() chunk_duration = chunk_end_time - chunk_start_time print(f"Chunk {i + 1} took {chunk_duration:.2f} seconds.") if i < num_chunks - 1: # No pause after the last chunk # Calculate ideal time per request based on rate limit time_per_request = SECONDS_PER_MINUTE / RATE_LIMIT if RATE_LIMIT > 0 else 0 # Calculate minimum duration this chunk should have taken to respect rate limit min_chunk_duration_for_rate = len(message_chunk) * time_per_request # Calculate pause needed pause_needed = max(0, min_chunk_duration_for_rate - chunk_duration) if pause_needed > 0: print( f"Pausing for {pause_needed:.2f} seconds to respect rate limit ({RATE_LIMIT}/min)..." ) time.sleep(pause_needed) overall_end_time = time.time() total_duration_minutes = (overall_end_time - overall_start_time) / 60 print( f"\nBatch completion finished." f" Processed {processed_count_total} new estimates in this run in {total_duration_minutes:.2f} minutes." ) print(f"Performing final save to {FILENAME}...") save_dataframe(df, FILENAME) print("\nScript finished.")