import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
# Define agent model with communication input
class AgentModel(tf.keras.Model):
def __init__(self, input_dim, comm_dim):
super().__init__()
self.dense1 = layers.Dense(64, activation='relu')
self.concat = layers.Concatenate()
self.dense2 = layers.Dense(64, activation='relu')
self.dropout = layers.Dropout(0.3)
self.output_layer = layers.Dense(1, activation='sigmoid')
def call(self, inputs):
obs, comm = inputs
x = self.dense1(obs)
x = self.concat([x, comm])
x = self.dense2(x)
x = self.dropout(x)
return self.output_layer(x)
# Simulate communication vector as average of other agents' outputs
def simulate_communication(agent_outputs):
comm_vectors = []
for i in range(len(agent_outputs)):
others = [agent_outputs[j] for j in range(len(agent_outputs)) if j != i]
comm = tf.reduce_mean(tf.stack(others), axis=0)
comm_vectors.append(comm)
return comm_vectors
# Training loop with shared team reward
def train_multi_agent(agents, data, labels, epochs=20, batch_size=32):
optimizer = optimizers.Adam(learning_rate=0.001)
for epoch in range(epochs):
for batch_start in range(0, len(data), batch_size):
batch_end = batch_start + batch_size
batch_data = data[batch_start:batch_end]
batch_labels = labels[batch_start:batch_end]
batch_size_actual = batch_data.shape[0]
with tf.GradientTape(persistent=True) as tape:
agent_outputs = [agent((batch_data, tf.zeros((batch_size_actual, 1)))) for agent in agents]
comm_vectors = simulate_communication(agent_outputs)
agent_preds = [agent((batch_data, comm)) for agent, comm in zip(agents, comm_vectors)]
# Compute team reward as average accuracy
team_preds = tf.reduce_mean(tf.stack(agent_preds), axis=0)
loss = tf.keras.losses.binary_crossentropy(batch_labels, team_preds)
loss = tf.reduce_mean(loss)
for agent in agents:
grads = tape.gradient(loss, agent.trainable_variables)
optimizer.apply_gradients(zip(grads, agent.trainable_variables))
print(f"Epoch {epoch+1}, Loss: {loss.numpy():.4f}")
# Create 3 agents
input_dim = 10
comm_dim = 1
agents = [AgentModel(input_dim, comm_dim) for _ in range(3)]
# Dummy data
np.random.seed(42)
data = np.random.rand(1000, input_dim).astype(np.float32)
labels = (np.sum(data, axis=1) > 5).astype(np.float32).reshape(-1, 1)
# Train
train_multi_agent(agents, data, labels)
# Evaluate team performance
agent_outputs = [agent((data, tf.zeros((len(data), 1)))) for agent in agents]
comm_vectors = simulate_communication(agent_outputs)
agent_preds = [agent((data, comm)) for agent, comm in zip(agents, comm_vectors)]
team_preds = tf.reduce_mean(tf.stack(agent_preds), axis=0).numpy()
team_accuracy = np.mean((team_preds > 0.5) == labels)
print(f"Team validation accuracy: {team_accuracy * 100:.2f}%")