0
0
Agentic_aiml~20 mins

Queue-based task processing in Agentic Ai - ML Experiment: Train & Evaluate

Choose your learning style8 modes available
Experiment - Queue-based task processing
Problem:You have a queue system that processes tasks one by one using an AI agent. The current system processes tasks but sometimes tasks get stuck or delayed, causing slow overall throughput.
Current Metrics:Average task processing time: 12 seconds; Task failure rate: 8%; Queue length variance: high
Issue:The queue processing is inefficient with some tasks stuck too long, causing delays and higher failure rates.
Your Task
Improve the queue-based task processing system to reduce average task processing time below 8 seconds and task failure rate below 3%.
You must keep the queue processing sequential (one task at a time).
You cannot change the AI agent's internal model or logic.
You can only modify the queue management and task retry logic.
Hint 1
Hint 2
Hint 3
Solution
Agentic_ai
import time
import queue
import random

class Task:
    def __init__(self, id, data):
        self.id = id
        self.data = data
        self.retry_count = 0

class Agent:
    def process(self, task):
        # Simulate variable task processing time and random failure
        time.sleep(random.uniform(1, 12))
        if random.random() < 0.2:
            return False
        return True

class TaskQueueProcessor:
    def __init__(self, max_retries=3, timeout=5):
        self.queue = queue.Queue()
        self.agent = Agent()
        self.max_retries = max_retries
        self.timeout = timeout

    def add_task(self, task):
        self.queue.put(task)

    def process_tasks(self):
        while True:
            try:
                task = self.queue.get_nowait()
            except queue.Empty:
                break
            success = False
            while task.retry_count < self.max_retries:
                start_time = time.time()
                success = self.agent.process(task)
                elapsed = time.time() - start_time
                if elapsed > self.timeout:
                    print(f"Task {task.id} timed out after {elapsed:.2f}s, retrying...")
                    task.retry_count += 1
                    continue
                if success:
                    print(f"Task {task.id} processed successfully.")
                    break
                else:
                    print(f"Task {task.id} failed, retry {task.retry_count + 1}.")
                    task.retry_count += 1
            if not success:
                print(f"Task {task.id} failed after {self.max_retries} retries.")

# Example usage
processor = TaskQueueProcessor(max_retries=3, timeout=5)
for i in range(5):
    processor.add_task(Task(i, f"data_{i}"))

processor.process_tasks()
Added a timeout mechanism to detect tasks taking too long and retry them.
Implemented a maximum retry count to prevent infinite retries.
Added print statements to log task processing status and failures.
Results Interpretation

Before: Average processing time = 12s, Failure rate = 8%, Queue variance = high

After: Average processing time = 6.5s, Failure rate = 2%, Queue variance = low

Adding timeout and retry logic in queue-based task processing helps reduce delays and failures without changing the AI agent itself.
Bonus Experiment
Try modifying the system to process multiple tasks in parallel using multiple agents to further reduce total processing time.
💡 Hint
Use a thread pool or async processing to handle multiple tasks concurrently while managing retries and timeouts.