import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
# Load example dataset (cats vs dogs simplified with CIFAR-10 classes 3 and 5)
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
# Filter for classes 3 (cat) and 5 (dog)
train_filter = np.where((y_train == 3) | (y_train == 5))[0]
test_filter = np.where((y_test == 3) | (y_test == 5))[0]
x_train, y_train = x_train[train_filter], y_train[train_filter]
x_test, y_test = x_test[test_filter], y_test[test_filter]
# Normalize images
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
# Convert labels to 0 (cat) and 1 (dog)
y_train = (y_train == 5).astype('float32')
y_test = (y_test == 5).astype('float32')
# Define VAE encoder
latent_dim = 64
class Sampling(layers.Layer):
def call(self, inputs):
z_mean, z_log_var = inputs
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.random.normal(shape=(batch, dim))
return z_mean + tf.exp(0.5 * z_log_var) * epsilon
encoder_inputs = layers.Input(shape=(32, 32, 3))
x = layers.Conv2D(32, 3, activation='relu', strides=2, padding='same')(encoder_inputs)
x = layers.Conv2D(64, 3, activation='relu', strides=2, padding='same')(x)
x = layers.Flatten()(x)
x = layers.Dense(128, activation='relu')(x)
z_mean = layers.Dense(latent_dim)(x)
z_log_var = layers.Dense(latent_dim)(x)
z = Sampling()([z_mean, z_log_var])
encoder = models.Model(encoder_inputs, [z_mean, z_log_var, z], name='encoder')
# Define VAE decoder
latent_inputs = layers.Input(shape=(latent_dim,))
x = layers.Dense(8 * 8 * 64, activation='relu')(latent_inputs)
x = layers.Reshape((8, 8, 64))(x)
x = layers.Conv2DTranspose(64, 3, strides=2, activation='relu', padding='same')(x)
x = layers.Conv2DTranspose(32, 3, strides=2, activation='relu', padding='same')(x)
decoder_outputs = layers.Conv2DTranspose(3, 3, activation='sigmoid', padding='same')(x)
decoder = models.Model(latent_inputs, decoder_outputs, name='decoder')
# Define VAE model
class VAE(models.Model):
def __init__(self, encoder, decoder, **kwargs):
super(VAE, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
def call(self, inputs):
z_mean, z_log_var, z = self.encoder(inputs)
reconstructed = self.decoder(z)
kl_loss = -0.5 * tf.reduce_mean(
1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
self.add_loss(kl_loss)
return reconstructed
vae = VAE(encoder, decoder)
vae.compile(optimizer='adam', loss='mse')
# Train VAE
vae.fit(x_train, x_train, epochs=10, batch_size=64, validation_split=0.1, verbose=0)
# Extract latent features for classification
z_mean_train, _, _ = encoder.predict(x_train)
z_mean_test, _, _ = encoder.predict(x_test)
# Simple classifier on latent space
clf = models.Sequential([
layers.Input(shape=(latent_dim,)),
layers.Dense(32, activation='relu'),
layers.Dense(1, activation='sigmoid')
])
clf.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
clf.fit(z_mean_train, y_train, epochs=20, batch_size=32, validation_split=0.1, verbose=0)
# Evaluate classifier
train_loss, train_acc = clf.evaluate(z_mean_train, y_train, verbose=0)
test_loss, test_acc = clf.evaluate(z_mean_test, y_test, verbose=0)
print(f'Training accuracy: {train_acc*100:.2f}%')
print(f'Validation accuracy: {test_acc*100:.2f}%')