Variational autoencoder on Theano

I am using the standard packages numpy, theano and matplotlib. I have compiled the theano with gpu support.

In [1]:
import numpy as np
import theano
import theano.tensor as T
import matplotlib.pyplot as plt
%matplotlib inline
Using gpu device 0: GeForce GT 750M (CNMeM is disabled, cuDNN 5005)

Define auxiliary variables:

In [2]:
ADAGRAD_EPS = 1e-10
rng = np.random.RandomState(1234)
floatX = theano.config.floatX

In variational autoencoders the loss function contains two parts:

  • KL divergence from the normal distribution with mean 0 and variance I on the code layer (output of encoder network);
  • Log-likelihood on the output layer (output of decoder network).

We define these two parts as separate functions:

In [3]:
def kld_unit_mvn(mu, var):
    return (mu.shape[1] + T.sum(T.log(var), axis=1) - T.sum(T.square(mu), axis=1) - T.sum(var, axis=1)) / 2.0

def log_diag_mvn(mu, var):
    def f(x):
        k = mu.shape[1]
        logp = (-k / 2.0) * np.log(2 * np.pi) - 0.5 * T.sum(T.log(var), axis=1) - T.sum(0.5 * (1.0 / var) * (x - mu) * (x - mu), axis=1)
        return logp
    return f

The building block of the neural network is class Layer that contains the matrix W of input weights and array b of input biases.

In [4]:
class Layer(object):
    def __init__(self, n_input, n_output, activation, x):

        W_init = np.asarray(
            rng.uniform(
                low=-np.sqrt(6. / (n_input + n_output)),
                high=np.sqrt(6. / (n_input + n_output)),
                size=(n_input, n_output)
            ),
            dtype=floatX
        )

        self.W = theano.shared(value = W_init.astype(floatX),
                               name = 'W',
                               borrow = True)

        self.b = theano.shared(value = np.zeros((n_output,), dtype=floatX),
                               name = 'b',
                               borrow = True)

        self.activation = activation

        self.params = [self.W, self.b]

        lin_output = T.dot(x, self.W) + self.b
        self.output = (lin_output if self.activation is None else self.activation(lin_output))

    def output2(self, x):
        lin_output = T.dot(x, self.W) + self.b
        return (lin_output if self.activation is None else self.activation(lin_output))

To build the variational autoencoder I am using the class GaussianMLP:

In [5]:
class GaussianMLP(object):
    def __init__(self, layer_sizes, activations, x, y=None, eps=None):

        # add layers to the network
        self.layers = []
        inp = x
        for n_input, n_output, activation in zip(layer_sizes[:-2], layer_sizes[1:-1], activations[1:-1]):
            self.layers.append(Layer(n_input, n_output, activation, inp))
            inp = self.layers[-1].output

        # add layers for reparametrization trick
        self.mu_layer = Layer(layer_sizes[-2], layer_sizes[-1], None, inp)
        self.logvar_layer = Layer(layer_sizes[-2], layer_sizes[-1], None, inp)

        self.params = []
        for layer in self.layers:
            self.params += layer.params
        self.params = self.params + self.mu_layer.params + self.logvar_layer.params

        mu = self.mu_layer.output
        var = T.exp(self.logvar_layer.output)
        sigma = T.sqrt(var)

        # define the cost function
        if eps:
            # encoder cost
            self.output = mu + sigma * eps
            self.cost = -T.sum(kld_unit_mvn(mu, var))
        else:
            # decoder cost
            self.mu = mu
            self.var = var
            self.output = mu
            self.cost = -T.sum(log_diag_mvn(self.output , var)(y))
            self.detailed_cost = log_diag_mvn(self.output , var)(y)

    def output2(self, x):
        for layer in self.layers:
            x = layer.output2(x)
        x = self.mu_layer.output2(x)
        return x

The variational autoencoder is an object of class VAE that contains two objects of the class GaussianMLP: the first object represents the encoder neural network, the second - the decoder neural network.

In [6]:
class VAE(object):
    def __init__(self, enc_layer_sizes, dec_layer_sizes, enc_activations, dec_activations, args):

        self.x = T.matrix('x', dtype=floatX)
        self.eps = T.matrix('eps', dtype=floatX)
  
        self.enc_mlp = GaussianMLP(enc_layer_sizes, enc_activations, self.x, eps=self.eps)
        self.dec_mlp = GaussianMLP(dec_layer_sizes, dec_activations, self.enc_mlp.output, y=self.x)

        self.cost = (self.enc_mlp.cost + self.dec_mlp.cost) / args['batch_size']

        self.params = self.enc_mlp.params + self.dec_mlp.params
        
        self.gparams = [T.grad(self.cost, p) + args['lmbda'] * p for p in self.params]
        self.gaccums = [theano.shared(value=np.zeros(p.get_value().shape, dtype=floatX)) for p in self.params]

        # define updates for the optimization
        self.updates = [
                (param, param - args['lr'] * gparam / T.sqrt(gaccum + T.square(gparam) + ADAGRAD_EPS))
                for param, gparam, gaccum in zip(self.params, self.gparams, self.gaccums)
        ]
        self.updates += [
            (gaccum, gaccum + T.square(gparam))
            for gaccum, gparam in zip(self.gaccums, self.gparams)
        ]

        self.get_dec_cost = theano.function(
            inputs=[self.x, self.eps],
            outputs=self.dec_mlp.cost
        )

        self.get_dec_mu = theano.function(
            inputs=[self.x, self.eps],
            outputs = self.dec_mlp.mu
        )

        self.get_dec_var = theano.function(
            inputs=[self.x, self.eps],
            outputs = self.dec_mlp.var
        )

        self.get_dec_detailed_cost = theano.function(
            inputs=[self.x, self.eps],
            outputs=self.dec_mlp.detailed_cost
        )

        self.get_enc_cost = theano.function(
            inputs=[self.x],
            outputs=self.enc_mlp.cost
        )

        # define the train function
        self.train = theano.function(
            inputs=[self.x, self.eps],
            outputs=self.cost,
            updates=self.updates
        )

        # get codes
        self.encode = theano.function(
            inputs=[self.x, self.eps],
            outputs=self.enc_mlp.output
        )

        self.decode = theano.function(
            inputs=[self.enc_mlp.output],
            outputs=self.dec_mlp.output
        )

        # decoding
        self.decode2 = theano.function(
            inputs=[self.eps],
            outputs=self.dec_mlp.output2(self.eps)
        )

The main code starts here. First, I create a toy dataset:

In [7]:
np.random.seed(0)
N = 1000
X = np.random.rand(N)
X = np.transpose(np.vstack((X, -1.0*np.sqrt(0.25 - (X - 0.5)**2) + 0.5  + 0.05*np.random.randn(N))).astype(floatX))

plt.figure(figsize = (8,4))
plt.scatter(X[:, 0], X[:, 1], lw=.3, s=3, cmap=plt.cm.cool)
plt.axis([0, 1, 0, 0.5])
plt.show()

The desired output of our autoencoder is the projection of original points on the semicircle. Another aim of the variational autoencoder is to generate points on the semicircle from uniform distribution (or any convenient type of distribution, for example, unit normal distribution). I have decided to use uniform distribution in this problem because of the structure of the toy dataset. The next step is to define the autoencoder:

In [8]:
args = {}
args['batch_size'] = 1000
args['lmbda'] = 0.0
args['lr'] = 0.1
args['epochs'] = 5000

num_batches = X.shape[0] / args['batch_size']

enc_layer_sizes = [X.shape[1], 10, 10, 1]
enc_activations = [None, T.tanh, None, None]
dec_layer_sizes = [enc_layer_sizes[-1], 10, 10, X.shape[1]]
dec_activations = [None, None, T.tanh, None]

vae = VAE(enc_layer_sizes, dec_layer_sizes, enc_activations, dec_activations, args)

I am using nnvisual.py code to show how the autoencoder looks like. This code is modified code from here: https://github.com/miloharper/visualise-neural-network (thanks to miloharper for it). I have changed the orientation of the neural network in the original file.

In [9]:
from nnvisual import *
network = NeuralNetwork()
for l in enc_layer_sizes + dec_layer_sizes[1:]:
    network.add_layer(l)
network.draw()

The main training loop:

In [10]:
np.random.seed(232)
for epoch in xrange(args['epochs']*num_batches):
    k = epoch % num_batches
    x = X[k * args['batch_size']:(k+1) * args['batch_size'], :]
    eps = np.random.rand(x.shape[0], enc_layer_sizes[-1]).astype(floatX)
    cost = vae.train(x, eps)

Let us see how well the autoencoder produces the result from the original data:

In [11]:
np.random.seed(2234)
eps = np.random.rand(X.shape[0], enc_layer_sizes[-1]).astype(floatX)
Z = vae.encode(X, eps)
X_projected = vae.decode(Z)

plt.figure(figsize = (8,4))
plt.scatter(X[:, 0], X[:, 1], c='blue', lw=.3, s=3)
plt.scatter(X_projected[:, 0], X_projected[:, 1], c='red', lw=.3, s=3)
plt.axis([0, 1, 0, 0.5])
plt.show()

And finally, we will see how our variational autoencoder produces points from the uniform distribution:

In [12]:
np.random.seed(25)
eps = 6.0*np.random.rand(X.shape[0], enc_layer_sizes[-1]).astype(floatX) - 3.0
X_produced = vae.decode2(eps)

plt.figure(figsize = (8,4))
plt.scatter(X_produced[:, 0], X_produced[:, 1], c='green', lw=.3, s=3)
plt.axis([0, 1, 0, 0.5])
plt.show()