import numpy as np
import random
class Environment:
def __init__(self, size=5):
self.size = size
self.goal = (size-1, size-1)
def reset(self):
self.agent_pos = (0, 0)
return self.agent_pos
def step(self, action):
x, y = self.agent_pos
if action == 0 and x > 0: # left
x -= 1
elif action == 1 and x < self.size - 1: # right
x += 1
elif action == 2 and y > 0: # up
y -= 1
elif action == 3 and y < self.size - 1: # down
y += 1
self.agent_pos = (x, y)
reward = 1 if self.agent_pos == self.goal else -0.1
done = self.agent_pos == self.goal
return self.agent_pos, reward, done
class Agent:
def __init__(self, env, shared_memory=None):
self.env = env
self.q_table = np.zeros((env.size, env.size, 4))
self.epsilon = 0.2
self.alpha = 0.5
self.gamma = 0.9
self.shared_memory = shared_memory
def choose_action(self, state):
if random.random() < self.epsilon:
return random.randint(0, 3)
x, y = state
return np.argmax(self.q_table[x, y])
def learn(self, state, action, reward, next_state):
x, y = state
nx, ny = next_state
predict = self.q_table[x, y, action]
target = reward + self.gamma * np.max(self.q_table[nx, ny])
self.q_table[x, y, action] += self.alpha * (target - predict)
# Share learning if shared memory exists
if self.shared_memory is not None:
self.shared_memory.append((state, action, reward, next_state))
def update_from_shared(self):
if self.shared_memory is None:
return
for state, action, reward, next_state in self.shared_memory:
x, y = state
nx, ny = next_state
predict = self.q_table[x, y, action]
target = reward + self.gamma * np.max(self.q_table[nx, ny])
self.q_table[x, y, action] += self.alpha * (target - predict)
def train_multi_agent(episodes=500):
env1 = Environment()
env2 = Environment()
shared_memory = []
agent1 = Agent(env1, shared_memory)
agent2 = Agent(env2, shared_memory)
success_count = 0
total_steps = 0
for ep in range(episodes):
state1 = env1.reset()
state2 = env2.reset()
done1 = done2 = False
steps = 0
while not (done1 and done2) and steps < 50:
if not done1:
action1 = agent1.choose_action(state1)
next_state1, reward1, done1 = env1.step(action1)
agent1.learn(state1, action1, reward1, next_state1)
state1 = next_state1
if not done2:
action2 = agent2.choose_action(state2)
next_state2, reward2, done2 = env2.step(action2)
agent2.learn(state2, action2, reward2, next_state2)
state2 = next_state2
steps += 1
# Agents update from shared memory
agent1.update_from_shared()
agent2.update_from_shared()
if done1 and done2:
success_count += 1
total_steps += steps
success_rate = success_count / episodes * 100
avg_steps = total_steps / success_count if success_count > 0 else 50
return success_rate, avg_steps
if __name__ == '__main__':
success_rate, avg_steps = train_multi_agent()
print(f"Multi-agent success rate: {success_rate:.1f}%, average steps: {avg_steps:.1f}")