sprint-econtai/classify_estimateability_of_tasks.py
Félix Dorn 43076bcbb1 old
2025-07-15 00:41:05 +02:00

411 lines
16 KiB
Python

import pandas as pd
import litellm
import dotenv
import os
import time
import json
import math
# Load environment variables
dotenv.load_dotenv(override=True)
# litellm._turn_on_debug() # Optional debugging
# --- Configuration ---
MODEL = "gpt-4.1-mini" # Make sure this model supports json_schema or structured output
RATE_LIMIT = 5000 # Requests per minute
CHUNK_SIZE = 300 # Number of unique tasks per API call
SECONDS_PER_MINUTE = 60
# File configuration
CLASSIFICATION_FILENAME = "tasks_estimateable.csv" # Output file with classifications
TASK_SOURCE_FOR_INIT_FILENAME = "tasks_with_estimates.csv"
OUTPUT_COLUMN_NAME = "task_estimateable"
SOURCE_FILTER_COLUMN = "remote_status"
SOURCE_FILTER_VALUE = "remote"
# --- Prompts and Schema ---
SYSTEM_PROMPT_CLASSIFY = """
Classify the provided O*NET task into one of these categories:
- ATOMIC (schedulable): A single, clearly-bounded activity, typically lasting minutes, hours, or a few days.
- ONGOING-CONSTRAINT (background role/ethical rule): A continuous responsibility or behavioural norm with no schedulable duration (e.g., “follow confidentiality rules,” “serve as department head”).
""".strip()
USER_MESSAGE_TEMPLATE_CLASSIFY = "Task: {task}"
CLASSIFICATION_CATEGORIES = ["ATOMIC", "ONGOING-CONSTRAINT"]
SCHEMA_FOR_CLASSIFICATION = {
"name": "classify_task_type",
"strict": True,
"schema": {
"type": "object",
"properties": {
"task_category": {
"type": "string",
"enum": CLASSIFICATION_CATEGORIES,
"description": "The classification of the task (ATOMIC or ONGOING-CONSTRAINT).",
}
},
"required": ["task_category"],
"additionalProperties": False,
},
}
def save_dataframe(df_to_save, filename):
"""Saves the DataFrame to the specified CSV file using atomic write."""
try:
temp_filename = filename + ".tmp"
df_to_save.to_csv(temp_filename, encoding="utf-8-sig", index=False)
os.replace(temp_filename, filename)
except Exception as e:
print(f"--- Error saving DataFrame to {filename}: {e} ---")
if os.path.exists(temp_filename):
try:
os.remove(temp_filename)
except Exception as remove_err:
print(
f"--- Error removing temporary save file {temp_filename}: {remove_err} ---"
)
# --- Load or Initialize DataFrame ---
try:
if os.path.exists(CLASSIFICATION_FILENAME):
df = pd.read_csv(CLASSIFICATION_FILENAME, encoding="utf-8-sig")
print(f"Successfully read {len(df)} rows from {CLASSIFICATION_FILENAME}.")
save_needed_after_load = False
if OUTPUT_COLUMN_NAME not in df.columns:
df[OUTPUT_COLUMN_NAME] = pd.NA
print(f"Added '{OUTPUT_COLUMN_NAME}' column.")
save_needed_after_load = True
df[OUTPUT_COLUMN_NAME].replace(["", None, ""], pd.NA, inplace=True)
if df[OUTPUT_COLUMN_NAME].dtype != object and not isinstance(
df[OUTPUT_COLUMN_NAME].dtype, pd.StringDtype
):
try:
df[OUTPUT_COLUMN_NAME] = df[OUTPUT_COLUMN_NAME].astype(object)
print(
f"Corrected dtype of '{OUTPUT_COLUMN_NAME}' to {df[OUTPUT_COLUMN_NAME].dtype}."
)
save_needed_after_load = True
except Exception as e:
print(
f"Warning: Could not convert column '{OUTPUT_COLUMN_NAME}' to object: {e}."
)
if "task" not in df.columns:
print(
f"Error: {CLASSIFICATION_FILENAME} must contain a 'task' column for processing."
)
exit()
if save_needed_after_load:
print(f"Saving {CLASSIFICATION_FILENAME} after adding/adjusting column.")
save_dataframe(df, CLASSIFICATION_FILENAME)
else:
print(
f"{CLASSIFICATION_FILENAME} not found. Attempting to create it from {TASK_SOURCE_FOR_INIT_FILENAME}."
)
if not os.path.exists(TASK_SOURCE_FOR_INIT_FILENAME):
print(
f"Error: Source file {TASK_SOURCE_FOR_INIT_FILENAME} not found. Cannot create {CLASSIFICATION_FILENAME}."
)
exit()
df_source = pd.read_csv(TASK_SOURCE_FOR_INIT_FILENAME, encoding="utf-8-sig")
required_source_cols_for_init = ["task", SOURCE_FILTER_COLUMN]
missing_source_cols = [
col for col in required_source_cols_for_init if col not in df_source.columns
]
if missing_source_cols:
print(
f"Error: Source file {TASK_SOURCE_FOR_INIT_FILENAME} is missing required columns for initialization: {', '.join(missing_source_cols)}."
)
exit()
df_source_filtered = df_source[
df_source[SOURCE_FILTER_COLUMN] == SOURCE_FILTER_VALUE
].copy()
if df_source_filtered.empty:
print(
f"Warning: No tasks with '{SOURCE_FILTER_COLUMN}' == '{SOURCE_FILTER_VALUE}' found in {TASK_SOURCE_FOR_INIT_FILENAME}. "
f"{CLASSIFICATION_FILENAME} will be created with schema but no tasks to classify initially."
)
df = df_source_filtered[["task"]].copy()
df[OUTPUT_COLUMN_NAME] = pd.NA
df[OUTPUT_COLUMN_NAME] = df[OUTPUT_COLUMN_NAME].astype(object)
print(
f"Created {CLASSIFICATION_FILENAME} using tasks from {TASK_SOURCE_FOR_INIT_FILENAME} "
f"(where {SOURCE_FILTER_COLUMN}='{SOURCE_FILTER_VALUE}'). New file has {len(df)} tasks."
)
save_dataframe(df, CLASSIFICATION_FILENAME)
except FileNotFoundError:
print(f"Error: A required file was not found. Please check paths.")
exit()
except Exception as e:
print(f"Error during DataFrame loading or initialization: {e}")
exit()
# --- Identify Unique Tasks to Process ---
if df.empty:
print(f"{CLASSIFICATION_FILENAME} is empty. Nothing to process. Exiting.")
exit()
initial_unprocessed_mask = df[OUTPUT_COLUMN_NAME].isna()
if not initial_unprocessed_mask.any():
print(
f"All tasks in {CLASSIFICATION_FILENAME} seem to have been classified already. Exiting."
)
exit()
# Filter for rows that are unprocessed AND have a valid 'task' string
valid_tasks_to_consider_df = df[
initial_unprocessed_mask & df["task"].notna() & (df["task"].str.strip() != "")
]
if valid_tasks_to_consider_df.empty:
print(
f"No valid, unclassified tasks found to process (after filtering out empty/NaN task descriptions). Exiting."
)
exit()
unique_task_labels_for_api = (
valid_tasks_to_consider_df["task"].drop_duplicates().tolist()
)
total_rows_to_update_potentially = len(
df[initial_unprocessed_mask]
) # Count all rows that are NA
print(
f"Found {total_rows_to_update_potentially} total rows in {CLASSIFICATION_FILENAME} needing classification."
)
print(
f"Identified {len(unique_task_labels_for_api)} unique, valid task labels to send to the API."
)
# --- Prepare messages for batch completion (only for unique task labels) ---
messages_list = []
print(f"Preparing messages for {len(unique_task_labels_for_api)} unique task labels...")
for task_label in unique_task_labels_for_api:
# task_label is already guaranteed to be non-empty and not NaN from the filtering above
user_message = USER_MESSAGE_TEMPLATE_CLASSIFY.format(task=task_label)
messages_for_task = [
{"role": "system", "content": SYSTEM_PROMPT_CLASSIFY},
{"role": "user", "content": user_message},
]
messages_list.append(messages_for_task)
print(f"Prepared {len(messages_list)} message sets for batch completion.")
if (
not messages_list
): # Should only happen if unique_task_labels_for_api was empty, caught above
print(
"No messages prepared, though unique tasks were identified. This is unexpected. Exiting."
)
exit()
# --- Call batch_completion in chunks with rate limiting and periodic saving ---
total_unique_tasks_to_send = len(
messages_list
) # Same as len(unique_task_labels_for_api)
num_chunks = math.ceil(total_unique_tasks_to_send / CHUNK_SIZE)
print(
f"\nStarting batch classification for {total_unique_tasks_to_send} unique task labels in {num_chunks} chunks..."
)
overall_start_time = time.time()
processed_rows_count_total = 0 # Counts actual rows updated in the DataFrame
for i in range(num_chunks):
chunk_start_message_index = i * CHUNK_SIZE
chunk_end_message_index = min((i + 1) * CHUNK_SIZE, total_unique_tasks_to_send)
message_chunk = messages_list[chunk_start_message_index:chunk_end_message_index]
# Get corresponding unique task labels for this chunk
chunk_task_labels = unique_task_labels_for_api[
chunk_start_message_index:chunk_end_message_index
]
if not message_chunk: # Should not happen if loop range is correct
continue
print(
f"\nProcessing chunk {i + 1}/{num_chunks} (Unique Task Labels {chunk_start_message_index + 1}-{chunk_end_message_index} of this run)..."
)
chunk_start_time = time.time()
responses = []
try:
print(
f"Sending {len(message_chunk)} requests (for unique tasks) for chunk {i + 1}..."
)
responses = litellm.batch_completion(
model=MODEL,
messages=message_chunk,
response_format={
"type": "json_schema",
"json_schema": SCHEMA_FOR_CLASSIFICATION,
},
num_retries=3,
)
print(f"Chunk {i + 1} API call completed.")
except Exception as e:
print(f"Error during litellm.batch_completion for chunk {i + 1}: {e}")
responses = [None] * len(message_chunk)
# --- Process responses for the current chunk ---
# chunk_updates stores {task_label: classification_category}
chunk_task_classifications = {}
successful_api_calls_in_chunk = 0
failed_api_calls_in_chunk = 0
if responses and len(responses) == len(message_chunk):
for j, response in enumerate(responses):
current_task_label = chunk_task_labels[
j
] # The unique task label for this response
content_str = None
if response is None:
print(
f"API call failed for task label '{current_task_label}' (response is None)."
)
failed_api_calls_in_chunk += 1
continue
try:
if (
response.choices
and response.choices[0].message
and response.choices[0].message.content
):
content_str = response.choices[0].message.content
classification_data = json.loads(content_str)
category_raw = classification_data.get("task_category")
if category_raw in CLASSIFICATION_CATEGORIES:
successful_api_calls_in_chunk += 1
chunk_task_classifications[current_task_label] = category_raw
else:
print(
f"Warning: Invalid or missing task_category for task label '{current_task_label}': '{category_raw}'. Content: '{content_str}'"
)
failed_api_calls_in_chunk += 1
else:
finish_reason = (
response.choices[0].finish_reason
if (response.choices and response.choices[0].finish_reason)
else "unknown"
)
error_message = (
response.choices[0].message.content
if (response.choices and response.choices[0].message)
else "No content in message."
)
print(
f"Warning: Received non-standard or empty response content for task label '{current_task_label}'. "
f"Finish Reason: '{finish_reason}'. Message: '{error_message}'. Raw Choices: {response.choices}"
)
failed_api_calls_in_chunk += 1
except json.JSONDecodeError:
print(
f"Warning: Could not decode JSON for task label '{current_task_label}'. Content received: '{content_str}'"
)
failed_api_calls_in_chunk += 1
except AttributeError as ae:
print(
f"Warning: Missing attribute processing response for task label '{current_task_label}': {ae}. Response: {response}"
)
failed_api_calls_in_chunk += 1
except Exception as e:
print(
f"Warning: Unexpected error processing response for task label '{current_task_label}': {type(e).__name__} - {e}. Response: {response}"
)
failed_api_calls_in_chunk += 1
else:
print(
f"Warning: Mismatch between #responses ({len(responses) if responses else 0}) "
f"and #messages sent ({len(message_chunk)}) for chunk {i + 1}, or no responses. Marking all API calls in chunk as failed."
)
failed_api_calls_in_chunk = len(message_chunk)
# --- Update Main DataFrame and Save Periodically ---
rows_updated_this_chunk = 0
if chunk_task_classifications:
print(
f"Updating main DataFrame with classifications for {len(chunk_task_classifications)} unique tasks from chunk {i + 1}..."
)
for task_label, category in chunk_task_classifications.items():
# Update all rows in the main df that match this task_label AND are still NA in the output column
update_condition = (df["task"] == task_label) & (
df[OUTPUT_COLUMN_NAME].isna()
)
num_rows_for_this_task_label = df[update_condition].shape[0]
if num_rows_for_this_task_label > 0:
df.loc[update_condition, OUTPUT_COLUMN_NAME] = category
rows_updated_this_chunk += num_rows_for_this_task_label
print(
f"Updated {rows_updated_this_chunk} rows in the DataFrame based on this chunk's API responses."
)
print(f"Saving progress to {CLASSIFICATION_FILENAME}...")
save_dataframe(df, CLASSIFICATION_FILENAME)
else:
print(
f"No successful API classifications obtained in chunk {i + 1} to update DataFrame or save."
)
print(
f"Chunk {i + 1} API summary: Successful Calls={successful_api_calls_in_chunk}, Failed/Skipped Calls={failed_api_calls_in_chunk}. "
f"Rows updated in DataFrame this chunk: {rows_updated_this_chunk}"
)
processed_rows_count_total += rows_updated_this_chunk
# --- Rate Limiting Pause ---
chunk_end_time = time.time()
chunk_duration = chunk_end_time - chunk_start_time
print(f"Chunk {i + 1} (API calls and DF update) took {chunk_duration:.2f} seconds.")
if i < num_chunks - 1:
time_per_request = SECONDS_PER_MINUTE / RATE_LIMIT if RATE_LIMIT > 0 else 0
min_chunk_duration_for_rate = (
len(message_chunk) * time_per_request
) # Based on API calls made
pause_needed = max(0, min_chunk_duration_for_rate - chunk_duration)
if pause_needed > 0:
print(
f"Pausing for {pause_needed:.2f} seconds to respect rate limit ({RATE_LIMIT}/min)..."
)
time.sleep(pause_needed)
overall_end_time = time.time()
total_duration_minutes = (overall_end_time - overall_start_time) / 60
print(
f"\nBatch classification finished."
f" Updated {processed_rows_count_total} rows in '{CLASSIFICATION_FILENAME}' with new classifications in this run."
f" Total duration: {total_duration_minutes:.2f} minutes."
)
print(f"Performing final save to {CLASSIFICATION_FILENAME}...")
save_dataframe(df, CLASSIFICATION_FILENAME)
print("\nScript finished.")