Bird
Raised Fist0
Agentic AIml~20 mins

Parallel tool execution in Agentic AI - ML Experiment: Train & Evaluate

Choose your learning style10 modes available

Start learning this pattern below

Jump into concepts and practice - no test required

or
Recommended
Test this pattern10 questions across easy, medium, and hard to know if this pattern is strong
Experiment - Parallel tool execution
Problem:You have an AI agent that uses multiple tools to answer questions. Currently, the agent runs these tools one after another, which slows down the response time.
Current Metrics:Average response time per query: 12 seconds; Accuracy: 90%
Issue:The agent's response time is too long because tools are executed sequentially, causing delays.
Your Task
Reduce the average response time to under 5 seconds by running tools in parallel, while maintaining accuracy above 88%.
You cannot reduce the number of tools used.
You must keep the same tool outputs and accuracy.
Use parallel execution methods supported by the agentic AI framework.
Hint 1
Hint 2
Hint 3
Solution
Agentic AI
import asyncio

class Tool:
    def __init__(self, name, delay, output):
        self.name = name
        self.delay = delay
        self.output = output

    async def run(self):
        await asyncio.sleep(self.delay)  # Simulate tool processing time
        return f"{self.name} output: {self.output}"

async def run_tools_in_parallel(tools):
    tasks = [tool.run() for tool in tools]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    tools = [
        Tool("ToolA", 5, "dataA"),
        Tool("ToolB", 4, "dataB"),
        Tool("ToolC", 3, "dataC")
    ]
    results = await run_tools_in_parallel(tools)
    combined_result = " | ".join(results)
    print(f"Combined tool outputs: {combined_result}")

if __name__ == "__main__":
    import time
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f"Total execution time: {end - start:.2f} seconds")
Replaced sequential tool execution with asynchronous parallel execution using asyncio.
Created async run method for each tool to simulate processing delay.
Used asyncio.gather to run all tools concurrently and collect outputs.
Combined outputs after all tools finished to maintain accuracy.
Results Interpretation

Before: Response time = 12 seconds, Accuracy = 90%

After: Response time = 5 seconds, Accuracy = 90%

Running multiple tools in parallel can significantly reduce response time without losing accuracy, demonstrating the power of asynchronous execution in AI agents.
Bonus Experiment
Try implementing parallel tool execution using multithreading instead of asyncio.
💡 Hint
Use Python's concurrent.futures.ThreadPoolExecutor to run tool calls concurrently and collect results.

Practice

(1/5)
1.

What is the main benefit of parallel tool execution in AI workflows?

easy
A. It makes tools run slower but more accurately.
B. It runs tools one after another to avoid errors.
C. It only works if tasks depend on each other.
D. It runs multiple tools at the same time to save time.

Solution

  1. Step 1: Understand parallel execution

    Parallel execution means running many tasks at once, not one by one.
  2. Step 2: Identify the benefit in AI workflows

    Running tools simultaneously saves time and improves efficiency.
  3. Final Answer:

    It runs multiple tools at the same time to save time. -> Option D
  4. Quick Check:

    Parallel execution = run many tools at once [OK]
Hint: Parallel means many at once, so it saves time [OK]
Common Mistakes:
  • Thinking parallel means slower execution
  • Believing tasks must depend on each other
  • Confusing parallel with sequential execution
2.

Which of the following is the correct way to start parallel execution of two tools toolA and toolB in Python using concurrent.futures?

import concurrent.futures

with concurrent.futures.ThreadPoolExecutor() as executor:
    # What goes here?
easy
A. executor.parallel(toolA, toolB)
B. executor.run(toolA, toolB)
C. executor.submit(toolA); executor.submit(toolB)
D. executor.execute(toolA & toolB)

Solution

  1. Step 1: Recall ThreadPoolExecutor usage

    The method to run functions in parallel is submit() for each function.
  2. Step 2: Check the options

    Only executor.submit(toolA); executor.submit(toolB) correctly submits both tools for parallel execution.
  3. Final Answer:

    executor.submit(toolA); executor.submit(toolB) -> Option C
  4. Quick Check:

    Use submit() to run functions in parallel [OK]
Hint: Use submit() for each tool to run in parallel [OK]
Common Mistakes:
  • Using non-existent methods like run() or execute()
  • Trying to pass multiple tools in one call
  • Confusing parallel execution syntax
3.

Given the code below, what will be the output?

import concurrent.futures
import time

def tool1():
    time.sleep(2)
    return 'Done1'

def tool2():
    time.sleep(1)
    return 'Done2'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future1 = executor.submit(tool1)
    future2 = executor.submit(tool2)
    print(future1.result())
    print(future2.result())
medium
A. Done2\nDone1
B. Done1\nDone2
C. Done1\nDone1
D. Done2\nDone2

Solution

  1. Step 1: Understand parallel execution and sleep times

    tool1 sleeps 2 seconds, tool2 sleeps 1 second, both start together.
  2. Step 2: Check order of result() calls

    future1.result() waits for tool1 (2s), then future2.result() waits for tool2 (1s). So output order matches calls, not completion time.
  3. Final Answer:

    Done1 Done2 -> Option B
  4. Quick Check:

    Results print in call order, not finish order [OK]
Hint: result() waits; output order matches calls, not finish time [OK]
Common Mistakes:
  • Assuming output order matches task finish time
  • Ignoring that result() blocks until done
  • Confusing sleep durations with print order
4.

What is the error in the following code that tries to run two tools in parallel?

import concurrent.futures

def toolA():
    return 'A'

def toolB():
    return 'B'

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(toolA, toolB)
    print(list(results))
medium
A. executor.map expects a function and an iterable, but toolB is not iterable.
B. executor.map cannot run more than one function at a time.
C. Missing parentheses when calling toolA and toolB.
D. ThreadPoolExecutor cannot be used with map.

Solution

  1. Step 1: Understand executor.map signature

    executor.map expects one function and an iterable of inputs to apply that function to.
  2. Step 2: Identify the error in arguments

    Passing two functions (toolA, toolB) is wrong; toolB is not an iterable of inputs for toolA.
  3. Final Answer:

    executor.map expects a function and an iterable, but toolB is not iterable. -> Option A
  4. Quick Check:

    map(func, iterable) needs iterable inputs [OK]
Hint: map needs one function and iterable inputs, not two functions [OK]
Common Mistakes:
  • Passing multiple functions to map
  • Confusing map with submit
  • Thinking map runs multiple different functions
5.

You want to run three independent AI tools toolX, toolY, and toolZ in parallel and collect their results as a dictionary with tool names as keys. Which code snippet correctly achieves this?

def toolX(): return 'X result'
def toolY(): return 'Y result'
def toolZ(): return 'Z result'

# Choose the correct parallel execution code
hard
A. import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: futures = {name: executor.submit(func) for name, func in {'toolX': toolX, 'toolY': toolY, 'toolZ': toolZ}.items()} results = {name: future.result() for name, future in futures.items()} print(results)
B. import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: results = executor.map(toolX, toolY, toolZ) print(dict(results))
C. results = {} for tool in [toolX, toolY, toolZ]: results[tool.__name__] = tool() print(results)
D. import threading results = {} def run_tool(name, func): results[name] = func() threads = [] for name, func in {'toolX': toolX, 'toolY': toolY, 'toolZ': toolZ}.items(): t = threading.Thread(target=run_tool, args=(name, func)) threads.append(t) t.start() for t in threads: t.join() print(results)

Solution

  1. Step 1: Check parallel execution with ThreadPoolExecutor

    import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: futures = {name: executor.submit(func) for name, func in {'toolX': toolX, 'toolY': toolY, 'toolZ': toolZ}.items()} results = {name: future.result() for name, future in futures.items()} print(results) uses submit() for each tool, stores futures with names, then collects results correctly.
  2. Step 2: Evaluate other options

    import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: results = executor.map(toolX, toolY, toolZ) print(dict(results)) misuses map with multiple functions; results = {} for tool in [toolX, toolY, toolZ]: results[tool.__name__] = tool() print(results) runs tools sequentially; import threading results = {} def run_tool(name, func): results[name] = func() threads = [] for name, func in {'toolX': toolX, 'toolY': toolY, 'toolZ': toolZ}.items(): t = threading.Thread(target=run_tool, args=(name, func)) threads.append(t) t.start() for t in threads: t.join() print(results) uses threading correctly but is more complex and not asked.
  3. Final Answer:

    Option A correctly runs tools in parallel and collects results as a dictionary. -> Option A
  4. Quick Check:

    submit() + dict comprehension collects parallel results [OK]
Hint: Use submit() with dict comprehension to map names to results [OK]
Common Mistakes:
  • Using map() with multiple functions
  • Running tools sequentially instead of parallel
  • Confusing threading with ThreadPoolExecutor usage