411 lines
16 KiB
Python
411 lines
16 KiB
Python
import pandas as pd
|
|
import litellm
|
|
import dotenv
|
|
import os
|
|
import time
|
|
import json
|
|
import math
|
|
|
|
# Load environment variables
|
|
dotenv.load_dotenv(override=True)
|
|
|
|
# litellm._turn_on_debug() # Optional debugging
|
|
|
|
# --- Configuration ---
|
|
MODEL = "gpt-4.1-mini" # Make sure this model supports json_schema or structured output
|
|
RATE_LIMIT = 5000 # Requests per minute
|
|
CHUNK_SIZE = 300 # Number of unique tasks per API call
|
|
SECONDS_PER_MINUTE = 60
|
|
|
|
# File configuration
|
|
CLASSIFICATION_FILENAME = "tasks_estimateable.csv" # Output file with classifications
|
|
TASK_SOURCE_FOR_INIT_FILENAME = "tasks_with_estimates.csv"
|
|
OUTPUT_COLUMN_NAME = "task_estimateable"
|
|
SOURCE_FILTER_COLUMN = "remote_status"
|
|
SOURCE_FILTER_VALUE = "remote"
|
|
|
|
# --- Prompts and Schema ---
|
|
SYSTEM_PROMPT_CLASSIFY = """
|
|
Classify the provided O*NET task into one of these categories:
|
|
- ATOMIC (schedulable): A single, clearly-bounded activity, typically lasting minutes, hours, or a few days.
|
|
- ONGOING-CONSTRAINT (background role/ethical rule): A continuous responsibility or behavioural norm with no schedulable duration (e.g., “follow confidentiality rules,” “serve as department head”).
|
|
""".strip()
|
|
|
|
USER_MESSAGE_TEMPLATE_CLASSIFY = "Task: {task}"
|
|
|
|
CLASSIFICATION_CATEGORIES = ["ATOMIC", "ONGOING-CONSTRAINT"]
|
|
|
|
SCHEMA_FOR_CLASSIFICATION = {
|
|
"name": "classify_task_type",
|
|
"strict": True,
|
|
"schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"task_category": {
|
|
"type": "string",
|
|
"enum": CLASSIFICATION_CATEGORIES,
|
|
"description": "The classification of the task (ATOMIC or ONGOING-CONSTRAINT).",
|
|
}
|
|
},
|
|
"required": ["task_category"],
|
|
"additionalProperties": False,
|
|
},
|
|
}
|
|
|
|
|
|
def save_dataframe(df_to_save, filename):
|
|
"""Saves the DataFrame to the specified CSV file using atomic write."""
|
|
try:
|
|
temp_filename = filename + ".tmp"
|
|
df_to_save.to_csv(temp_filename, encoding="utf-8-sig", index=False)
|
|
os.replace(temp_filename, filename)
|
|
except Exception as e:
|
|
print(f"--- Error saving DataFrame to {filename}: {e} ---")
|
|
if os.path.exists(temp_filename):
|
|
try:
|
|
os.remove(temp_filename)
|
|
except Exception as remove_err:
|
|
print(
|
|
f"--- Error removing temporary save file {temp_filename}: {remove_err} ---"
|
|
)
|
|
|
|
|
|
# --- Load or Initialize DataFrame ---
|
|
try:
|
|
if os.path.exists(CLASSIFICATION_FILENAME):
|
|
df = pd.read_csv(CLASSIFICATION_FILENAME, encoding="utf-8-sig")
|
|
print(f"Successfully read {len(df)} rows from {CLASSIFICATION_FILENAME}.")
|
|
|
|
save_needed_after_load = False
|
|
if OUTPUT_COLUMN_NAME not in df.columns:
|
|
df[OUTPUT_COLUMN_NAME] = pd.NA
|
|
print(f"Added '{OUTPUT_COLUMN_NAME}' column.")
|
|
save_needed_after_load = True
|
|
|
|
df[OUTPUT_COLUMN_NAME].replace(["", None, ""], pd.NA, inplace=True)
|
|
|
|
if df[OUTPUT_COLUMN_NAME].dtype != object and not isinstance(
|
|
df[OUTPUT_COLUMN_NAME].dtype, pd.StringDtype
|
|
):
|
|
try:
|
|
df[OUTPUT_COLUMN_NAME] = df[OUTPUT_COLUMN_NAME].astype(object)
|
|
print(
|
|
f"Corrected dtype of '{OUTPUT_COLUMN_NAME}' to {df[OUTPUT_COLUMN_NAME].dtype}."
|
|
)
|
|
save_needed_after_load = True
|
|
except Exception as e:
|
|
print(
|
|
f"Warning: Could not convert column '{OUTPUT_COLUMN_NAME}' to object: {e}."
|
|
)
|
|
|
|
if "task" not in df.columns:
|
|
print(
|
|
f"Error: {CLASSIFICATION_FILENAME} must contain a 'task' column for processing."
|
|
)
|
|
exit()
|
|
|
|
if save_needed_after_load:
|
|
print(f"Saving {CLASSIFICATION_FILENAME} after adding/adjusting column.")
|
|
save_dataframe(df, CLASSIFICATION_FILENAME)
|
|
else:
|
|
print(
|
|
f"{CLASSIFICATION_FILENAME} not found. Attempting to create it from {TASK_SOURCE_FOR_INIT_FILENAME}."
|
|
)
|
|
if not os.path.exists(TASK_SOURCE_FOR_INIT_FILENAME):
|
|
print(
|
|
f"Error: Source file {TASK_SOURCE_FOR_INIT_FILENAME} not found. Cannot create {CLASSIFICATION_FILENAME}."
|
|
)
|
|
exit()
|
|
|
|
df_source = pd.read_csv(TASK_SOURCE_FOR_INIT_FILENAME, encoding="utf-8-sig")
|
|
|
|
required_source_cols_for_init = ["task", SOURCE_FILTER_COLUMN]
|
|
missing_source_cols = [
|
|
col for col in required_source_cols_for_init if col not in df_source.columns
|
|
]
|
|
if missing_source_cols:
|
|
print(
|
|
f"Error: Source file {TASK_SOURCE_FOR_INIT_FILENAME} is missing required columns for initialization: {', '.join(missing_source_cols)}."
|
|
)
|
|
exit()
|
|
|
|
df_source_filtered = df_source[
|
|
df_source[SOURCE_FILTER_COLUMN] == SOURCE_FILTER_VALUE
|
|
].copy()
|
|
|
|
if df_source_filtered.empty:
|
|
print(
|
|
f"Warning: No tasks with '{SOURCE_FILTER_COLUMN}' == '{SOURCE_FILTER_VALUE}' found in {TASK_SOURCE_FOR_INIT_FILENAME}. "
|
|
f"{CLASSIFICATION_FILENAME} will be created with schema but no tasks to classify initially."
|
|
)
|
|
|
|
df = df_source_filtered[["task"]].copy()
|
|
df[OUTPUT_COLUMN_NAME] = pd.NA
|
|
df[OUTPUT_COLUMN_NAME] = df[OUTPUT_COLUMN_NAME].astype(object)
|
|
|
|
print(
|
|
f"Created {CLASSIFICATION_FILENAME} using tasks from {TASK_SOURCE_FOR_INIT_FILENAME} "
|
|
f"(where {SOURCE_FILTER_COLUMN}='{SOURCE_FILTER_VALUE}'). New file has {len(df)} tasks."
|
|
)
|
|
save_dataframe(df, CLASSIFICATION_FILENAME)
|
|
|
|
except FileNotFoundError:
|
|
print(f"Error: A required file was not found. Please check paths.")
|
|
exit()
|
|
except Exception as e:
|
|
print(f"Error during DataFrame loading or initialization: {e}")
|
|
exit()
|
|
|
|
|
|
# --- Identify Unique Tasks to Process ---
|
|
if df.empty:
|
|
print(f"{CLASSIFICATION_FILENAME} is empty. Nothing to process. Exiting.")
|
|
exit()
|
|
|
|
initial_unprocessed_mask = df[OUTPUT_COLUMN_NAME].isna()
|
|
|
|
if not initial_unprocessed_mask.any():
|
|
print(
|
|
f"All tasks in {CLASSIFICATION_FILENAME} seem to have been classified already. Exiting."
|
|
)
|
|
exit()
|
|
|
|
# Filter for rows that are unprocessed AND have a valid 'task' string
|
|
valid_tasks_to_consider_df = df[
|
|
initial_unprocessed_mask & df["task"].notna() & (df["task"].str.strip() != "")
|
|
]
|
|
|
|
if valid_tasks_to_consider_df.empty:
|
|
print(
|
|
f"No valid, unclassified tasks found to process (after filtering out empty/NaN task descriptions). Exiting."
|
|
)
|
|
exit()
|
|
|
|
unique_task_labels_for_api = (
|
|
valid_tasks_to_consider_df["task"].drop_duplicates().tolist()
|
|
)
|
|
total_rows_to_update_potentially = len(
|
|
df[initial_unprocessed_mask]
|
|
) # Count all rows that are NA
|
|
|
|
print(
|
|
f"Found {total_rows_to_update_potentially} total rows in {CLASSIFICATION_FILENAME} needing classification."
|
|
)
|
|
print(
|
|
f"Identified {len(unique_task_labels_for_api)} unique, valid task labels to send to the API."
|
|
)
|
|
|
|
|
|
# --- Prepare messages for batch completion (only for unique task labels) ---
|
|
messages_list = []
|
|
print(f"Preparing messages for {len(unique_task_labels_for_api)} unique task labels...")
|
|
|
|
for task_label in unique_task_labels_for_api:
|
|
# task_label is already guaranteed to be non-empty and not NaN from the filtering above
|
|
user_message = USER_MESSAGE_TEMPLATE_CLASSIFY.format(task=task_label)
|
|
messages_for_task = [
|
|
{"role": "system", "content": SYSTEM_PROMPT_CLASSIFY},
|
|
{"role": "user", "content": user_message},
|
|
]
|
|
messages_list.append(messages_for_task)
|
|
|
|
print(f"Prepared {len(messages_list)} message sets for batch completion.")
|
|
if (
|
|
not messages_list
|
|
): # Should only happen if unique_task_labels_for_api was empty, caught above
|
|
print(
|
|
"No messages prepared, though unique tasks were identified. This is unexpected. Exiting."
|
|
)
|
|
exit()
|
|
|
|
|
|
# --- Call batch_completion in chunks with rate limiting and periodic saving ---
|
|
total_unique_tasks_to_send = len(
|
|
messages_list
|
|
) # Same as len(unique_task_labels_for_api)
|
|
num_chunks = math.ceil(total_unique_tasks_to_send / CHUNK_SIZE)
|
|
|
|
print(
|
|
f"\nStarting batch classification for {total_unique_tasks_to_send} unique task labels in {num_chunks} chunks..."
|
|
)
|
|
|
|
overall_start_time = time.time()
|
|
processed_rows_count_total = 0 # Counts actual rows updated in the DataFrame
|
|
|
|
for i in range(num_chunks):
|
|
chunk_start_message_index = i * CHUNK_SIZE
|
|
chunk_end_message_index = min((i + 1) * CHUNK_SIZE, total_unique_tasks_to_send)
|
|
|
|
message_chunk = messages_list[chunk_start_message_index:chunk_end_message_index]
|
|
# Get corresponding unique task labels for this chunk
|
|
chunk_task_labels = unique_task_labels_for_api[
|
|
chunk_start_message_index:chunk_end_message_index
|
|
]
|
|
|
|
if not message_chunk: # Should not happen if loop range is correct
|
|
continue
|
|
|
|
print(
|
|
f"\nProcessing chunk {i + 1}/{num_chunks} (Unique Task Labels {chunk_start_message_index + 1}-{chunk_end_message_index} of this run)..."
|
|
)
|
|
chunk_start_time = time.time()
|
|
responses = []
|
|
try:
|
|
print(
|
|
f"Sending {len(message_chunk)} requests (for unique tasks) for chunk {i + 1}..."
|
|
)
|
|
responses = litellm.batch_completion(
|
|
model=MODEL,
|
|
messages=message_chunk,
|
|
response_format={
|
|
"type": "json_schema",
|
|
"json_schema": SCHEMA_FOR_CLASSIFICATION,
|
|
},
|
|
num_retries=3,
|
|
)
|
|
print(f"Chunk {i + 1} API call completed.")
|
|
|
|
except Exception as e:
|
|
print(f"Error during litellm.batch_completion for chunk {i + 1}: {e}")
|
|
responses = [None] * len(message_chunk)
|
|
|
|
# --- Process responses for the current chunk ---
|
|
# chunk_updates stores {task_label: classification_category}
|
|
chunk_task_classifications = {}
|
|
successful_api_calls_in_chunk = 0
|
|
failed_api_calls_in_chunk = 0
|
|
|
|
if responses and len(responses) == len(message_chunk):
|
|
for j, response in enumerate(responses):
|
|
current_task_label = chunk_task_labels[
|
|
j
|
|
] # The unique task label for this response
|
|
content_str = None
|
|
|
|
if response is None:
|
|
print(
|
|
f"API call failed for task label '{current_task_label}' (response is None)."
|
|
)
|
|
failed_api_calls_in_chunk += 1
|
|
continue
|
|
|
|
try:
|
|
if (
|
|
response.choices
|
|
and response.choices[0].message
|
|
and response.choices[0].message.content
|
|
):
|
|
content_str = response.choices[0].message.content
|
|
classification_data = json.loads(content_str)
|
|
category_raw = classification_data.get("task_category")
|
|
|
|
if category_raw in CLASSIFICATION_CATEGORIES:
|
|
successful_api_calls_in_chunk += 1
|
|
chunk_task_classifications[current_task_label] = category_raw
|
|
else:
|
|
print(
|
|
f"Warning: Invalid or missing task_category for task label '{current_task_label}': '{category_raw}'. Content: '{content_str}'"
|
|
)
|
|
failed_api_calls_in_chunk += 1
|
|
else:
|
|
finish_reason = (
|
|
response.choices[0].finish_reason
|
|
if (response.choices and response.choices[0].finish_reason)
|
|
else "unknown"
|
|
)
|
|
error_message = (
|
|
response.choices[0].message.content
|
|
if (response.choices and response.choices[0].message)
|
|
else "No content in message."
|
|
)
|
|
print(
|
|
f"Warning: Received non-standard or empty response content for task label '{current_task_label}'. "
|
|
f"Finish Reason: '{finish_reason}'. Message: '{error_message}'. Raw Choices: {response.choices}"
|
|
)
|
|
failed_api_calls_in_chunk += 1
|
|
|
|
except json.JSONDecodeError:
|
|
print(
|
|
f"Warning: Could not decode JSON for task label '{current_task_label}'. Content received: '{content_str}'"
|
|
)
|
|
failed_api_calls_in_chunk += 1
|
|
except AttributeError as ae:
|
|
print(
|
|
f"Warning: Missing attribute processing response for task label '{current_task_label}': {ae}. Response: {response}"
|
|
)
|
|
failed_api_calls_in_chunk += 1
|
|
except Exception as e:
|
|
print(
|
|
f"Warning: Unexpected error processing response for task label '{current_task_label}': {type(e).__name__} - {e}. Response: {response}"
|
|
)
|
|
failed_api_calls_in_chunk += 1
|
|
else:
|
|
print(
|
|
f"Warning: Mismatch between #responses ({len(responses) if responses else 0}) "
|
|
f"and #messages sent ({len(message_chunk)}) for chunk {i + 1}, or no responses. Marking all API calls in chunk as failed."
|
|
)
|
|
failed_api_calls_in_chunk = len(message_chunk)
|
|
|
|
# --- Update Main DataFrame and Save Periodically ---
|
|
rows_updated_this_chunk = 0
|
|
if chunk_task_classifications:
|
|
print(
|
|
f"Updating main DataFrame with classifications for {len(chunk_task_classifications)} unique tasks from chunk {i + 1}..."
|
|
)
|
|
for task_label, category in chunk_task_classifications.items():
|
|
# Update all rows in the main df that match this task_label AND are still NA in the output column
|
|
update_condition = (df["task"] == task_label) & (
|
|
df[OUTPUT_COLUMN_NAME].isna()
|
|
)
|
|
num_rows_for_this_task_label = df[update_condition].shape[0]
|
|
|
|
if num_rows_for_this_task_label > 0:
|
|
df.loc[update_condition, OUTPUT_COLUMN_NAME] = category
|
|
rows_updated_this_chunk += num_rows_for_this_task_label
|
|
|
|
print(
|
|
f"Updated {rows_updated_this_chunk} rows in the DataFrame based on this chunk's API responses."
|
|
)
|
|
print(f"Saving progress to {CLASSIFICATION_FILENAME}...")
|
|
save_dataframe(df, CLASSIFICATION_FILENAME)
|
|
else:
|
|
print(
|
|
f"No successful API classifications obtained in chunk {i + 1} to update DataFrame or save."
|
|
)
|
|
|
|
print(
|
|
f"Chunk {i + 1} API summary: Successful Calls={successful_api_calls_in_chunk}, Failed/Skipped Calls={failed_api_calls_in_chunk}. "
|
|
f"Rows updated in DataFrame this chunk: {rows_updated_this_chunk}"
|
|
)
|
|
processed_rows_count_total += rows_updated_this_chunk
|
|
|
|
# --- Rate Limiting Pause ---
|
|
chunk_end_time = time.time()
|
|
chunk_duration = chunk_end_time - chunk_start_time
|
|
print(f"Chunk {i + 1} (API calls and DF update) took {chunk_duration:.2f} seconds.")
|
|
|
|
if i < num_chunks - 1:
|
|
time_per_request = SECONDS_PER_MINUTE / RATE_LIMIT if RATE_LIMIT > 0 else 0
|
|
min_chunk_duration_for_rate = (
|
|
len(message_chunk) * time_per_request
|
|
) # Based on API calls made
|
|
pause_needed = max(0, min_chunk_duration_for_rate - chunk_duration)
|
|
|
|
if pause_needed > 0:
|
|
print(
|
|
f"Pausing for {pause_needed:.2f} seconds to respect rate limit ({RATE_LIMIT}/min)..."
|
|
)
|
|
time.sleep(pause_needed)
|
|
|
|
overall_end_time = time.time()
|
|
total_duration_minutes = (overall_end_time - overall_start_time) / 60
|
|
print(
|
|
f"\nBatch classification finished."
|
|
f" Updated {processed_rows_count_total} rows in '{CLASSIFICATION_FILENAME}' with new classifications in this run."
|
|
f" Total duration: {total_duration_minutes:.2f} minutes."
|
|
)
|
|
|
|
print(f"Performing final save to {CLASSIFICATION_FILENAME}...")
|
|
save_dataframe(df, CLASSIFICATION_FILENAME)
|
|
|
|
print("\nScript finished.")
|