Source code for cde.density_estimator.KMN

#
# code skeleton from https://github.com/janvdvegt/KernelMixtureNetwork
# this version additionally supports fit_by_crossval and multidimentional Y
#
import math
import numpy as np
import sklearn
import tensorflow as tf
import edward as ed
from edward.models import Categorical, Mixture, MultivariateNormalDiag
from cde.utils.tf_utils.network import MLP
import cde.utils.tf_utils.layers as L
from cde.utils.tf_utils.layers_powered import LayersPowered
from cde.utils.serializable import Serializable
#import matplotlib.pyplot as plt


from cde.utils.center_point_select import sample_center_points
from cde.density_estimator.BaseNNMixtureEstimator import BaseNNMixtureEstimator

import logging
logging.getLogger("tensorflow").setLevel(logging.ERROR)
tf.logging.set_verbosity(tf.logging.ERROR)


[docs]class KernelMixtureNetwork(BaseNNMixtureEstimator): """ Kernel Mixture Network Estimator https://arxiv.org/abs/1705.07111 Args: name: (str) name space of MDN (should be unique in code, otherwise tensorflow namespace collitions may arise) ndim_x: (int) dimensionality of x variable ndim_y: (int) dimensionality of y variable center_sampling_method: String that describes the method to use for finding kernel centers. Allowed values \ [all, random, distance, k_means, agglomerative] n_centers: Number of kernels to use in the output keep_edges: Keep the extreme y values as center to keep expressiveness init_scales: List or scalar that describes (initial) values of bandwidth parameter train_scales: Boolean that describes whether or not to make the scales trainable x_noise_std: (optional) standard deviation of Gaussian noise over the the training data X -> regularization through noise. Adding noise is automatically deactivated during y_noise_std: (optional) standard deviation of Gaussian noise over the the training data Y -> regularization through noise entropy_reg_coef: (optional) scalar float coefficient for shannon entropy penalty on the mixture component weight distribution weight_decay: (float) the amount of decoupled (http://arxiv.org/abs/1711.05101) weight decay to apply weight_normalization: boolean specifying whether weight normalization shall be used data_normalization: (boolean) whether to normalize the data (X and Y) to exhibit zero-mean and std dropout: (float) the probability of switching off nodes during training random_seed: (optional) seed (int) of the random number generators used """ def __init__(self, name, ndim_x, ndim_y, center_sampling_method='k_means', n_centers=50, keep_edges=True, init_scales='default', hidden_sizes=(16, 16), hidden_nonlinearity=tf.nn.tanh, train_scales=True, n_training_epochs=1000, x_noise_std=None, y_noise_std=None, entropy_reg_coef=0.0, weight_decay=0.0, weight_normalization=True, data_normalization=True, dropout=0.0, random_seed=None): Serializable.quick_init(self, locals()) self._check_uniqueness_of_scope(name) self.name = name self.ndim_x = ndim_x self.ndim_y = ndim_y self.random_seed = random_seed self.random_state = np.random.RandomState(seed=random_seed) tf.set_random_seed(random_seed) self.n_centers = n_centers self.hidden_sizes = hidden_sizes self.hidden_nonlinearity = hidden_nonlinearity self.n_training_epochs = n_training_epochs # center sampling parameters self.center_sampling_method = center_sampling_method self.keep_edges = keep_edges # regularization parameters self.x_noise_std = x_noise_std self.y_noise_std = y_noise_std self.entropy_reg_coef = entropy_reg_coef self.weight_decay = weight_decay self.weight_normalization = weight_normalization self.data_normalization = data_normalization self.dropout = dropout if init_scales == 'default': init_scales = np.array([0.7, 0.3]) self.n_scales = len(init_scales) self.train_scales = train_scales self.init_scales = init_scales # Transform scales so that the softplus will result in passed init_scales self.init_scales_softplus = [np.log(np.exp(s) - 1) for s in init_scales] self.can_sample = True self.has_pdf = True self.has_cdf = True self.fitted = False # build tensorflow model self._build_model()
[docs] def fit(self, X, Y, eval_set=None, verbose=True): """ Fits the conditional density model with provided data Args: X: numpy array to be conditioned on - shape: (n_samples, n_dim_x) Y: numpy array of y targets - shape: (n_samples, n_dim_y) eval_set: (tuple) eval/test set - tuple (X_test, Y_test) verbose: (boolean) controls the verbosity (console output) """ X, Y = self._handle_input_dimensionality(X, Y, fitting=True) if eval_set is not None: eval_set = self._handle_input_dimensionality(*eval_set) self._setup_inference_and_initialize() # data normalization if desired if self.data_normalization: # this must happen after the initialization self._compute_data_normalization(X, Y) # computes mean & std of data and assigns it to tf graph for normalization Y_normalized = (Y - self.data_statistics['Y_mean']) / (self.data_statistics['Y_std'] + 1e-8) else: Y_normalized = Y # sample locations and assign them to tf locs variable sampled_locs = sample_center_points(Y_normalized, method=self.center_sampling_method, k=self.n_centers, keep_edges=self.keep_edges, random_state=self.random_state) self.sess.run(tf.assign(self.locs, sampled_locs)) # train the model self._partial_fit(X, Y, n_epoch=self.n_training_epochs, eval_set=eval_set, verbose=verbose) self.fitted = True if verbose: print("optimal scales: {}".format(self.sess.run(self.scales)))
def _build_model(self): """ implementation of the KMN """ with tf.variable_scope(self.name): # add playeholders, data_normalization and data_noise if desired. Also sets up the placeholder for dropout prob self.layer_in_x, self.layer_in_y = self._build_input_layers() self.X_in = L.get_output(self.layer_in_x) self.Y_in = L.get_output(self.layer_in_y) # get batch size self.batch_size = tf.shape(self.X_ph)[0] # create core multi-layer perceptron core_network = MLP( name="core_network", input_layer=self.layer_in_x, output_dim=self.n_centers*self.n_scales, hidden_sizes=self.hidden_sizes, hidden_nonlinearity=self.hidden_nonlinearity, output_nonlinearity=None, dropout_ph=self.dropout_ph if self.dropout else None ) self.core_output_layer = core_network.output_layer # weights of the mixture components self.logits = L.get_output(self.core_output_layer) self.softmax_layer_weights = L.NonlinearityLayer(self.core_output_layer, nonlinearity=tf.nn.softmax) self.weights = L.get_output(self.softmax_layer_weights) # locations of the kernelfunctions self.locs = tf.Variable(np.zeros((self.n_centers, self.ndim_y)), name="locs", trainable=False, dtype=tf.float32) # assign sampled locs when fitting self.locs_layer = L.VariableLayer(core_network.input_layer, (self.n_centers, self.ndim_y), variable=self.locs, name="locs", trainable=False) self.locs_array = tf.unstack(tf.transpose(tf.multiply(tf.ones((self.batch_size, self.n_centers, self.ndim_y)), self.locs), perm=[1, 0, 2])) assert len(self.locs_array) == self.n_centers # scales of the gaussian kernels log_scales_layer = L.VariableLayer(core_network.input_layer, (self.n_scales,), variable=tf.Variable(self.init_scales_softplus, dtype=tf.float32, trainable=self.train_scales), name="log_scales", trainable=self.train_scales) self.scales_layer = L.NonlinearityLayer(log_scales_layer, nonlinearity=tf.nn.softplus) self.scales = L.get_output(self.scales_layer) self.scales_array = scales_array = tf.unstack(tf.transpose(tf.multiply(tf.ones((self.batch_size, self.ndim_y, self.n_scales)), self.scales), perm=[2,0,1])) assert len(self.scales_array) == self.n_scales # put mixture components together self.y_input = L.get_output(self.layer_in_y) self.cat = cat = Categorical(logits=self.logits) self.components = components = [MultivariateNormalDiag(loc=loc, scale_diag=scale) for loc in self.locs_array for scale in scales_array] self.mixture = mixture = Mixture(cat=cat, components=components) # softmax entropy penalty -> regularization self.softmax_entropy = tf.reduce_sum(- tf.multiply(tf.log(self.weights), self.weights), axis=1) self.entropy_reg_coef_ph = tf.placeholder_with_default(float(self.entropy_reg_coef), name='entropy_reg_coef', shape=()) self.softmax_entrop_loss = self.entropy_reg_coef_ph * self.softmax_entropy tf.losses.add_loss(self.softmax_entrop_loss, tf.GraphKeys.REGULARIZATION_LOSSES) # tensor to compute probabilities if self.data_normalization: self.pdf_ = mixture.prob(self.y_input) / tf.reduce_prod(self.std_y_sym) self.log_pdf_ = mixture.log_prob(self.y_input) - tf.reduce_sum(tf.log(self.std_y_sym)) else: self.pdf_ = mixture.prob(self.y_input) self.log_pdf_ = mixture.log_prob(self.y_input) # symbolic tensors for getting the unnormalized mixture components if self.data_normalization: self.scales_unnormalized = tf.transpose(tf.multiply(tf.ones((self.ndim_y, self.n_scales)), self.scales)) * self.std_y_sym # shape = (n_scales, ndim_y) self.locs_unnormalized = self.locs * self.std_y_sym + self.mean_y_sym else: self.scales_unnormalized = tf.transpose(tf.multiply(tf.ones((self.ndim_y, self.n_scales)), self.scales)) # shape = (n_scales, ndim_y) self.locs_unnormalized = self.locs # initialize LayersPowered --> provides functions for serializing tf models LayersPowered.__init__(self, [self.core_output_layer, self.locs_layer, self.scales_layer, self.layer_in_y]) def _param_grid(self): param_grid = { "n_training_epochs": [500, 1000], "n_centers": [50, 200], "x_noise_std": [0.15, 0.2, 0.3], "y_noise_std": [0.1, 0.15, 0.2] } return param_grid def _get_mixture_components(self, X): assert self.fitted locs, weights, scales = self.sess.run([self.locs_unnormalized, self.weights, self.scales_unnormalized], feed_dict={self.X_ph: X}) locs = np.concatenate([np.tile(np.expand_dims(locs[i:i+1], axis=1), (X.shape[0], self.n_scales, 1)) for i in range(self.n_centers)], axis=1) cov = np.tile(np.expand_dims(scales, axis=0), (X.shape[0], self.n_centers, 1)) assert weights.shape[0] == locs.shape[0] == cov.shape[0] == X.shape[0] assert weights.shape[1] == locs.shape[1] == cov.shape[1] == self.n_centers*self.n_scales assert locs.shape[2] == cov.shape[2] == self.ndim_y assert locs.ndim == 3 and cov.ndim == 3 and weights.ndim == 2 return weights, locs, cov def __str__(self): return "\nEstimator type: {}\n center sampling method: {}\n n_centers: {}\n keep_edges: {}\n init_scales: {}\n train_scales: {}\n " \ "n_training_epochs: {}\n x_noise_std: {}\n y_noise_std: {}\n".format(self.__class__.__name__, self.center_sampling_method, self.n_centers, self.keep_edges, self.init_scales_softplus, self.train_scales, self.n_training_epochs, self.x_noise_std, self.y_noise_std)