import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
# Sample dataset: sequences of numbers 0-9, predict next number
class SequenceDataset(torch.utils.data.Dataset):
def __init__(self):
self.data = []
for i in range(1000):
start = torch.randint(0, 10, (1,)).item()
seq_list = [start]
for j in range(4):
next_val = (seq_list[-1] + 1) % 10
seq_list.append(next_val)
seq = torch.tensor(seq_list, dtype=torch.long)
input_seq = seq[:-1]
target = seq[1:]
self.data.append((input_seq, target))
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx]
class RNNPredictor(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.embedding = nn.Embedding(10, input_size)
self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.embedding(x) # (batch, seq_len, input_size)
out, _ = self.rnn(x) # out: (batch, seq_len, hidden_size)
out = self.fc(out) # (batch, seq_len, output_size)
return out
# Hyperparameters
input_size = 8
hidden_size = 16
output_size = 10
batch_size = 32
epochs = 10
# Data
dataset = SequenceDataset()
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
# Model, loss, optimizer
model = RNNPredictor(input_size, hidden_size, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
# Training loop
for epoch in range(epochs):
model.train()
train_loss = 0.0
for inputs, targets in train_loader:
optimizer.zero_grad()
outputs = model(inputs) # outputs shape: (batch, seq_len, output_size)
loss = criterion(outputs.view(-1, output_size), targets.view(-1))
loss.backward()
optimizer.step()
train_loss += loss.item()
avg_train_loss = train_loss / len(train_loader)
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in val_loader:
outputs = model(inputs)
vloss = criterion(outputs.view(-1, output_size), targets.view(-1))
val_loss += vloss.item()
pred = outputs.argmax(dim=2).view(-1)
correct += (pred == targets.view(-1)).sum().item()
total += targets.numel()
avg_val_loss = val_loss / len(val_loader)
val_acc = 100 * correct / total
print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.2f}%")
# After training, test on a sample sequence
with torch.no_grad():
sample_seq = torch.tensor([[1, 2, 3, 4]]) # batch size 1
pred = model(sample_seq)
predicted_numbers = pred.argmax(dim=2).squeeze().tolist()
print(f"Input sequence: {sample_seq.squeeze().tolist()}")
print(f"Predicted next numbers: {predicted_numbers}")