import numpy as np
import scipy.stats as stats
from .BaseConditionalDensitySimulation import BaseConditionalDensitySimulation
from cde.utils.misc import project_to_pos_semi_def
from sklearn.mixture import GaussianMixture as GMM
[docs]class GaussianMixture(BaseConditionalDensitySimulation):
"""
This model allows to fit and sample from a uni- bi- or multivariate Gaussian mixture model with diagonal covariance matrices.
The mixture model is composed by a linear combination of an arbitrary number of components n_kernels. Means, covariances and weights are
estimated by Maximum-Likelihood for each component. It is possible to specify the number of kernels to define the modality of the
distribution and also dimensionality for both x and y. The component means are initialized randomly according to given standard
deviation. Also the weights are initialized randomly.
Args:
n_kernels: number of mixture components
ndim_x: dimensionality of X / number of random variables in X
ndim_y: dimensionality of Y / number of random variables in Y
means_std: std. dev. when sampling the kernel means
random_seed: seed for the random_number generator
"""
def __init__(self, n_kernels=5, ndim_x=1, ndim_y=1, means_std=1.5, random_seed=None):
self.random_state = np.random.RandomState(seed=random_seed)
self.random_seed = random_seed
self.has_pdf = True
self.has_cdf = True
self.can_sample = True
""" set parameters, calculate weights, means and covariances """
self.n_kernels = n_kernels
self.ndim = ndim_x + ndim_y
self.ndim_x = ndim_x
self.ndim_y = ndim_y
self.means_std = means_std
self.weights = self._sample_weights(n_kernels) #shape(n_kernels,), sums to one
self.means = self.random_state.normal(loc=np.zeros([self.ndim]), scale=self.means_std, size=[n_kernels, self.ndim]) #shape(n_kernels, n_dims)
""" Sample cov matrixes and assure that cov matrix is pos definite"""
self.covariances_x = project_to_pos_semi_def(np.abs(self.random_state.normal(loc=1, scale=0.5, size=(n_kernels, self.ndim_x, self.ndim_x)))) #shape(n_kernels, ndim_x, ndim_y)
self.covariances_y = project_to_pos_semi_def(np.abs(self.random_state.normal(loc=1, scale=0.5, size=(n_kernels, self.ndim_y, self.ndim_y)))) # shape(n_kernels, ndim_x, ndim_y)
""" some eigenvalues of the sampled covariance matrices can be exactly zero -> map to positive
semi-definite subspace """
self.covariances = np.zeros(shape=(n_kernels, self.ndim, self.ndim))
self.covariances[:, :ndim_x, :ndim_x] = self.covariances_x
self.covariances[:, ndim_x:, ndim_x:] = self.covariances_y
""" after mapping, define the remaining variables and collect frozen multivariate variables
(x,y), x and y for later conditional draws """
self.means_x = self.means[:, :ndim_x]
self.means_y = self.means[:, ndim_x:]
self.gaussians, self.gaussians_x, self.gaussians_y = [], [], []
for i in range(n_kernels):
self.gaussians.append(stats.multivariate_normal(mean=self.means[i,], cov=self.covariances[i]))
self.gaussians_x.append(stats.multivariate_normal(mean=self.means_x[i,], cov=self.covariances_x[i]))
self.gaussians_y.append(stats.multivariate_normal(mean=self.means_y[i,], cov=self.covariances_y[i]))
# approximate data statistics
self.y_mean, self.y_std = self._compute_data_statistics()
[docs] def pdf(self, X, Y):
""" conditional probability density function P(Y|X)
See "Conditional Gaussian Mixture Models for Environmental Risk Mapping" [Gilardi, Bengio] for the math.
Args:
X: the position/conditional variable for the distribution P(Y|X), array_like, shape:(n_samples, ndim_x)
Y: the on X conditioned variable Y, array_like, shape:(n_samples, ndim_y)
Returns:
the cond. distribution of Y given X, for the given realizations of X with shape:(n_samples,)
"""
X, Y = self._handle_input_dimensionality(X,Y)
P_y = np.stack([self.gaussians_y[i].pdf(Y) for i in range(self.n_kernels)], axis=1) #shape(X.shape[0], n_kernels)
W_x = self._W_x(X)
cond_prob = np.sum(np.multiply(W_x, P_y), axis=1)
assert cond_prob.shape[0] == X.shape[0]
return cond_prob
[docs] def cdf(self, X, Y):
""" conditional cumulative probability density function P(Y<y|X=x).
See "Conditional Gaussian Mixture Models for Environmental Risk Mapping" [Gilardi, Bengio] for the math.
Args:
X: the position/conditional variable for the distribution P(Y<y|X=x), array_like, shape:(n_samples, ndim_x)
Y: the on X conditioned variable Y, array_like, shape:(n_samples, ndim_y)
Returns:
the cond. cumulative distribution of Y given X, for the given realizations of X with shape:(n_samples,)
"""
X, Y = self._handle_input_dimensionality(X, Y)
P_y = np.stack([self.gaussians_y[i].cdf(Y) for i in range(self.n_kernels)],
axis=1) # shape(X.shape[0], n_kernels)
W_x = self._W_x(X)
cond_prob = np.sum(np.multiply(W_x, P_y), axis=1)
assert cond_prob.shape[0] == X.shape[0]
return cond_prob
[docs] def joint_pdf(self, X, Y):
""" joint probability density function P(X, Y)
Args:
X: variable X for the distribution P(X, Y), array_like, shape:(n_samples, ndim_x)
Y: variable Y for the distribution P(X, Y) array_like, shape:(n_samples, ndim_y)
Returns:
the joint distribution of X and Y wih shape:(n_samples,)
"""
X, Y = self._handle_input_dimensionality(X,Y)
XY = np.concatenate([X,Y], axis=1)
a = [self.weights[i] * self.gaussians[i].pdf(XY) for i in range(self.n_kernels)]
p_i = np.stack(a, axis=1)
return np.sum(p_i, axis=1)
[docs] def simulate_conditional(self, X):
""" Draws random samples from the conditional distribution
Args:
X: x to be conditioned on when drawing a sample from y ~ p(y|x) - numpy array of shape (n_samples, ndim_x)
Returns:
Conditional random samples y drawn from p(y|x) - numpy array of shape (n_samples, ndim_y)
"""
X = self._handle_input_dimensionality(X)
if np.all(np.all(X == X[0, :], axis=1)):
return self._simulate_rows_same(X)
else:
return self._simulate_rows_individually(X)
[docs] def simulate(self, n_samples=1000):
""" Draws random samples from the unconditional distribution p(x,y)
Args:
n_samples: (int) number of samples to be drawn from the conditional distribution
Returns:
(X,Y) - random samples drawn from p(x,y) - numpy arrays of shape (n_samples, ndim_x) and (n_samples, ndim_y)
"""
assert n_samples > 0
scales = np.diagonal(self.covariances, axis1=1, axis2=2)
samples = self._sample(self.weights, self.means, scales, n_samples)
x_samples = samples[:, :self.ndim_x]
y_samples = samples[:, :self.ndim_y]
assert x_samples.shape == (n_samples, self.ndim_x)
assert y_samples.shape == (n_samples, self.ndim_y)
return x_samples, y_samples
[docs] def mean_(self, x_cond, n_samples=None):
""" Conditional mean of the distribution
Args:
x_cond: different x values to condition on - numpy array of shape (n_values, ndim_x)
Returns:
Means E[y|x] corresponding to x_cond - numpy array of shape (n_values, ndim_y)
"""
assert x_cond.ndim == 2 and x_cond.shape[1] == self.ndim_x
W_x = self._W_x(x_cond)
means = W_x.dot(self.means_y)
return means
[docs] def covariance(self, x_cond, n_samples=None):
""" Covariance of the distribution conditioned on x_cond
Args:
x_cond: different x values to condition on - numpy array of shape (n_values, ndim_x)
Returns:
Covariances Cov[y|x] corresponding to x_cond - numpy array of shape (n_values, ndim_y, ndim_y)
"""
assert x_cond.ndim == 2 and x_cond.shape[1] == self.ndim_x
W_x = self._W_x(x_cond)
covs = np.zeros((x_cond.shape[0], self.ndim_y, self.ndim_y))
glob_mean = self.mean_(x_cond)
for i in range(x_cond.shape[0]):
c1 = np.zeros((self.ndim_y, self.ndim_y))
c2 = np.zeros(c1.shape)
weights = W_x[i]
for j in range(weights.shape[0]):
c1 = weights[j] * self.covariances_y[j]
a = (self.means_y[j] - glob_mean[i])
d = weights[j] * np.outer(a, a)
c2 += d
covs[i] = c1 + c2
return covs
def _simulate_rows_individually(self, X):
W_x = self._W_x(X)
Y = np.zeros(shape=(X.shape[0], self.ndim_y))
for i in range(X.shape[0]):
discrete_dist = stats.rv_discrete(values=(range(self.n_kernels), W_x[i, :]))
idx = discrete_dist.rvs()
Y[i, :] = self.gaussians_y[idx].rvs()
assert X.shape[0] == Y.shape[0]
return X, Y
def _draw_from_discrete(self, w_x):
discrete_dist = stats.rv_discrete(values=(range(self.n_kernels), w_x))
idx = discrete_dist.rvs()
return self.gaussians_y[idx].rvs()
def _simulate_rows_same(self, X):
n_samples = X.shape[0]
weights = self._W_x(X)[0]
scales = np.diagonal(self.covariances_y, axis1=1, axis2=2)
y_sample = self._sample(weights, self.means_y, scales, n_samples)
return X, y_sample
def _sample(self, weights, locs, scales, n_samples):
gmm = GMM(n_components=self.n_kernels, covariance_type='diag', max_iter=5, tol=1e-1, random_state=self.random_state)
gmm.fit(np.random.normal(size=(100,self.ndim_y)))
gmm.converged_ = True
gmm.weights_ = weights
gmm.means_ = locs
gmm.covariances_ = scales
samples, _ = gmm.sample(n_samples)
return samples
def _sample_weights(self, n_weights):
""" samples density weights -> sum up to one
Args:
n_weights: number of weights
Returns:
ndarray of weights with shape (n_weights,)
"""
weights = self.random_state.uniform(0, 1, size=[n_weights])
return weights / np.sum(weights)
def _W_x(self, X):
""" Helper function to normalize the joint density P(Y,X) by the marginal density P(X)
Args:
X: conditional random variable, array_like, shape:(n_samples, ndim_x)
Return:
the normalized weighted marginal gaussian distributions P(X) for each n_kernel, shape:(n_samples,n_kernels)
"""
assert X.ndim == 2 and X.shape[1] == self.ndim_x
if X.shape[0] == 1:
w_p = np.stack([np.array([self.weights[i] * self.gaussians_x[i].pdf(X)]) for i in range(self.n_kernels)], axis=1)
else:
w_p = np.stack([self.weights[i] * self.gaussians_x[i].pdf(X) for i in range(self.n_kernels)], axis=1)
normalizing_term = np.sum(w_p, axis=1)
result = w_p / normalizing_term[:,None]
return result
def __str__(self):
return "\nProbabilistic model type: {}\nn_kernels: {}\nn_dim_x: {}\nn_dim_y: {}\nmeans_std: {}\n".format(self.__class__.__name__,
self.n_kernels, self.ndim_x, self.ndim_y, self.means)
def __unicode__(self):
return self.__str__()