import numpy as np
import sklearn
import tensorflow as tf
import edward as ed
from edward.models import Categorical, Mixture, MultivariateNormalDiag
from cde.utils.tf_utils.network import MLP
import cde.utils.tf_utils.layers as L
from cde.utils.tf_utils.layers_powered import LayersPowered
from cde.utils.serializable import Serializable
#import matplotlib.pyplot as plt
from .BaseNNMixtureEstimator import BaseNNMixtureEstimator
[docs]class MixtureDensityNetwork(BaseNNMixtureEstimator):
""" Mixture Density Network Estimator
See "Mixture Density networks", Bishop 1994
Args:
name: (str) name space of MDN (should be unique in code, otherwise tensorflow namespace collitions may arise)
ndim_x: (int) dimensionality of x variable
ndim_y: (int) dimensionality of y variable
n_centers: Number of Gaussian mixture components
hidden_sizes: (tuple of int) sizes of the hidden layers of the neural network
hidden_nonlinearity: (tf function) nonlinearity of the hidden layers
n_training_epochs: Number of epochs for training
x_noise_std: (optional) standard deviation of Gaussian noise over the the training data X -> regularization through noise
y_noise_std: (optional) standard deviation of Gaussian noise over the the training data Y -> regularization through noise
entropy_reg_coef: (optional) scalar float coefficient for shannon entropy penalty on the mixture component weight distribution
weight_decay: (float) the amount of decoupled (http://arxiv.org/abs/1711.05101) weight decay to apply
weight_normalization: (boolean) whether weight normalization shall be used
data_normalization: (boolean) whether to normalize the data (X and Y) to exhibit zero-mean and std
dropout: (float) the probability of switching off nodes during training
random_seed: (optional) seed (int) of the random number generators used
"""
def __init__(self, name, ndim_x, ndim_y, n_centers=10, hidden_sizes=(16, 16), hidden_nonlinearity=tf.nn.tanh,
n_training_epochs=1000, x_noise_std=None, y_noise_std=None, entropy_reg_coef=0.0, weight_decay=0.0,
weight_normalization=True, data_normalization=True, dropout=0.0, random_seed=None):
Serializable.quick_init(self, locals())
self._check_uniqueness_of_scope(name)
self.name = name
self.ndim_x = ndim_x
self.ndim_y = ndim_y
self.random_seed = random_seed
self.random_state = np.random.RandomState(seed=random_seed)
tf.set_random_seed(random_seed)
self.n_centers = n_centers
self.hidden_sizes = hidden_sizes
self.hidden_nonlinearity = hidden_nonlinearity
self.n_training_epochs = n_training_epochs
# regularization parameters
self.x_noise_std = x_noise_std
self.y_noise_std = y_noise_std
self.entropy_reg_coef = entropy_reg_coef
self.weight_decay = weight_decay
self.weight_normalization = weight_normalization
self.data_normalization = data_normalization
self.dropout = dropout
self.can_sample = True
self.has_pdf = True
self.has_cdf = True
self.fitted = False
# build tensorflow model
self._build_model()
[docs] def fit(self, X, Y, random_seed=None, verbose=True, eval_set=None, **kwargs):
""" Fits the conditional density model with provided data
Args:
X: numpy array to be conditioned on - shape: (n_samples, n_dim_x)
Y: numpy array of y targets - shape: (n_samples, n_dim_y)
eval_set: (tuple) eval/test set - tuple (X_test, Y_test)
verbose: (boolean) controls the verbosity (console output)
"""
X, Y = self._handle_input_dimensionality(X, Y, fitting=True)
if eval_set is not None:
eval_set = self._handle_input_dimensionality(*eval_set)
self._setup_inference_and_initialize()
# data normalization if desired
if self.data_normalization: # this must happen after the initialization
self._compute_data_normalization(X, Y) # computes mean & std of data and assigns it to tf graph for normalization
# train the model
self._partial_fit(X, Y, n_epoch=self.n_training_epochs, verbose=verbose, eval_set=eval_set)
self.fitted = True
def _build_model(self):
"""
implementation of the MDN
"""
with tf.variable_scope(self.name):
# adds playeholders, data_normalization and data_noise if desired. Also adds a placeholder for dropout probability
self.layer_in_x, self.layer_in_y = self._build_input_layers()
# create core multi-layer perceptron
mlp_output_dim = 2 * self.ndim_y * self.n_centers + self.n_centers
core_network = MLP(
name="core_network",
input_layer=self.layer_in_x,
output_dim=mlp_output_dim,
hidden_sizes=self.hidden_sizes,
hidden_nonlinearity=self.hidden_nonlinearity,
output_nonlinearity=None,
weight_normalization=self.weight_normalization,
dropout_ph=self.dropout_ph if self.dropout else None
)
core_output_layer = core_network.output_layer
# slice output of MLP into three equally sized parts for loc, scale and mixture weights
slice_layer_locs = L.SliceLayer(core_output_layer, indices=slice(0, self.ndim_y * self.n_centers), axis=-1)
slice_layer_scales = L.SliceLayer(core_output_layer, indices=slice(self.ndim_y * self.n_centers, 2 * self.ndim_y * self.n_centers), axis=-1)
slice_layer_weights = L.SliceLayer(core_output_layer, indices=slice(2 * self.ndim_y * self.n_centers, mlp_output_dim), axis=-1)
# locations mixture components
self.reshape_layer_locs = L.ReshapeLayer(slice_layer_locs, (-1, self.n_centers, self.ndim_y))
self.locs = L.get_output(self.reshape_layer_locs)
# scales of the mixture components
reshape_layer_scales = L.ReshapeLayer(slice_layer_scales, (-1, self.n_centers, self.ndim_y))
self.softplus_layer_scales = L.NonlinearityLayer(reshape_layer_scales, nonlinearity=tf.nn.softplus)
self.scales = L.get_output(self.softplus_layer_scales)
# weights of the mixture components
self.logits = L.get_output(slice_layer_weights)
self.softmax_layer_weights = L.NonlinearityLayer(slice_layer_weights, nonlinearity=tf.nn.softmax)
self.weights = L.get_output(self.softmax_layer_weights)
# # put mixture components together
self.y_input = L.get_output(self.layer_in_y)
self.cat = cat = Categorical(logits=self.logits)
self.components = components = [MultivariateNormalDiag(loc=loc, scale_diag=scale) for loc, scale
in zip(tf.unstack(self.locs, axis=1), tf.unstack( self.scales, axis=1))]
self.mixture = mixture = Mixture(cat=cat, components=components, value=tf.zeros_like(self.y_input))
# softmax entropy penalty -> regularization
self.softmax_entropy = tf.reduce_sum(- tf.multiply(tf.log(self.weights), self.weights), axis=1)
self.entropy_reg_coef_ph = tf.placeholder_with_default(float(self.entropy_reg_coef), name='entropy_reg_coef', shape=())
self.softmax_entrop_loss = self.entropy_reg_coef_ph * self.softmax_entropy
tf.losses.add_loss(self.softmax_entrop_loss, tf.GraphKeys.REGULARIZATION_LOSSES)
# tensor to store samples
self.samples = mixture.sample() #TODO either use it or remove it
# tensor to compute probabilities
if self.data_normalization:
self.pdf_ = mixture.prob(self.y_input) / tf.reduce_prod(self.std_y_sym)
self.log_pdf_ = mixture.log_prob(self.y_input) - tf.reduce_sum(tf.log(self.std_y_sym))
else:
self.pdf_ = mixture.prob(self.y_input)
self.log_pdf_ = mixture.log_prob(self.y_input)
# symbolic tensors for getting the unnormalized mixture components
if self.data_normalization:
self.scales_unnormalized = self.scales * self.std_y_sym
self.locs_unnormalized = self.locs * self.std_y_sym + self.mean_y_sym
else:
self.scales_unnormalized = self.scales
self.locs_unnormalized = self.locs
# initialize LayersPowered --> provides functions for serializing tf models
LayersPowered.__init__(self, [self.softmax_layer_weights, self.softplus_layer_scales, self.reshape_layer_locs,
self.layer_in_y])
def _param_grid(self):
param_grid = {
"n_training_epochs": [500, 1000],
"n_centers": [5, 10, 20],
"x_noise_std": [0.1, 0.15, 0.2, 0.3],
"y_noise_std": [0.1, 0.15, 0.2]
}
return param_grid
def _get_mixture_components(self, X):
assert self.fitted
weights, locs, scales = self.sess.run([self.weights, self.locs_unnormalized, self.scales_unnormalized], feed_dict={self.X_ph: X})
assert weights.shape[0] == locs.shape[0] == scales.shape[0] == X.shape[0]
assert weights.shape[1] == locs.shape[1] == scales.shape[1] == self.n_centers
assert locs.shape[2] == scales.shape[2] == self.ndim_y
assert locs.ndim == 3 and scales.ndim == 3 and weights.ndim == 2
return weights, locs, scales
def __str__(self):
return "\nEstimator type: {}\n n_centers: {}\n entropy_reg_coef: {}\n data_normalization: {} \n weight_normalization: {}\n" \
"n_training_epochs: {}\n x_noise_std: {}\n y_noise_std: {}\n ".format(self.__class__.__name__, self.n_centers, self.entropy_reg_coef,
self.data_normalization, self.weight_normalization, self.n_training_epochs, self.x_noise_std, self.y_noise_std)