Source code for cde.density_estimator.NKDE

import numpy as np
from sklearn.preprocessing import normalize
from scipy.stats import multivariate_normal
import scipy.optimize as optimize
import warnings

from cde.utils.misc import norm_along_axis_1
from .BaseDensityEstimator import BaseDensityEstimator
from cde.utils.async_executor import execute_batch_async_pdf
from scipy.special import logsumexp

_MULTIPROC_THRESHOLD = 10 ** 4
_N_POINT_OUT_OF_RANGE = 5 # number for closest points to consider if all points are outside of the epsilon range
_DAFAULT_EPSILON = 0.4
_MAX_ITER_CV_ML_OPTIMIZER = 30

[docs]class NeighborKernelDensityEstimation(BaseDensityEstimator): """ Epsilon-Neighbor Kernel Density Estimation (lazy learner) with Gaussian Kernels Args: name: (str) name / identifier of estimator ndim_x: (int) dimensionality of x variable ndim_y: (int) dimensionality of y variable epsilon: size of the (normalized) neighborhood region bandwidth: (float of array_like) initial bandwidth parameter param_selection: parameter selection method. Must be - None or False: use the provided epsilon and bandwidth - normal_reference: bandwidths are chosen according to normal reference distribution - cv_ml: select bandwidth and epsilon via maximum likelihood leave-one-out cross-validation weighted: if true - the neighborhood Gaussians are weighted according to their distance to the query point, if false - all neighborhood Gaussians are weighted equally random_seed: (optional) seed (int) of the random number generators used """ def __init__(self, name='NKDE', ndim_x=None, ndim_y=None, epsilon=0.4, bandwidth=0.6, param_selection='normal_reference', weighted=True, n_jobs=-1, random_seed=None): self.random_state = np.random.RandomState(seed=random_seed) assert isinstance(bandwidth, (int, float)) or isinstance(bandwidth, np.ndarray) assert isinstance(epsilon, (int, float)) assert param_selection in ['cv_ml', 'normal_reference', None, False] self.name = name self.ndim_x = ndim_x self.ndim_y = ndim_y self.random_seed = random_seed self.epsilon = epsilon self.bandwidth = bandwidth self.parameter_selection = param_selection self.weighted = weighted self.n_jobs = n_jobs self.fitted = False self.can_sample = False self.has_pdf = True self.has_cdf = False
[docs] def fit(self, X, Y, **kwargs): """ Since NKDE is a lazy learner, fit just stores the provided training data (X,Y) Args: X: numpy array to be conditioned on - shape: (n_samples, n_dim_x) Y: numpy array of y targets - shape: (n_samples, n_dim_y) """ X, Y = self._handle_input_dimensionality(X, Y, fitting=True) self._build_model(X, Y) self.can_sample = False self.has_cdf = False self.fitted = True
[docs] def pdf(self, X, Y): """ Predicts the conditional probability density p(y|x). Requires the model to be fitted. Args: X: numpy array to be conditioned on - shape: (n_samples, n_dim_x) Y: numpy array of y targets - shape: (n_samples, n_dim_y) Returns: conditional probability p(y|x) - numpy array of shape (n_query_samples, ) """ return np.exp(self.log_pdf(X,Y))
[docs] def log_pdf(self, X, Y): """ Predicts the conditional log-probability log p(y|x). Requires the model to be fitted. Args: X: numpy array to be conditioned on - shape: (n_samples, n_dim_x) Y: numpy array of y targets - shape: (n_samples, n_dim_y) Returns: conditional log-probability log p(y|x) - numpy array of shape (n_query_samples, ) """ X, Y = self._handle_input_dimensionality(X, Y, fitting=True) n_samples = X.shape[0] if n_samples >= _MULTIPROC_THRESHOLD: return execute_batch_async_pdf(self._log_pdf, X, Y, n_jobs=self.n_jobs) else: return self._log_pdf(X, Y)
def sample(self, X): raise NotImplementedError("Neighbor Kernel Density Estimation is a lazy learner and does not support sampling")
[docs] def loo_likelihood(self, bandwidth, epsilon): """ calculates the negative leave-one-out log-likelihood of the training data Args: bw: bandwidth parameter epsilon: size of the (normalized) neighborhood region """ kernel_weights = self._kernel_weights(self.X_train, epsilon) # remove kernel of query x and re-normalize weights np.fill_diagonal(kernel_weights, 0.0) kernel_weights_loo = kernel_weights / np.sum(kernel_weights, axis=-1, keepdims=True) conditional_log_densities = np.zeros(self.n_train_points) for i in range(self.n_train_points): conditional_log_densities[i] = self._log_density(bandwidth, kernel_weights_loo[i, :], self.Y_train[i, :]) return np.mean(conditional_log_densities)
def _build_model(self, X, Y): # save mean and std of data for normalization self.x_std = np.std(X, axis=0) self.x_mean = np.mean(X, axis=0) self.y_mean = np.std(Y, axis=0) self.y_std = np.std(Y, axis=0) self.n_train_points = X.shape[0] # lazy learner - just store training data self.X_train = self._normalize_x(X) self.Y_train = Y # prepare Gaussians centered in the Y points self.locs_array = np.vsplit(Y, self.n_train_points) self.log_kernel = multivariate_normal(mean=np.ones(self.ndim_y)).logpdf # select / properly initialize bandwidth and epsilon if isinstance(self.bandwidth, (int, float)): self.bandwidth = self.y_std * self.bandwidth if self.parameter_selection == 'normal_reference': self.bandwidth = self._normal_reference() elif self.parameter_selection == 'cv_ml': self.bandwidth, self.epsilon = self._cv_ml() def _log_pdf(self, X, Y): """ 1. Determine weights of the Gaussians """ X_normalized = self._normalize_x(X) kernel_weights = self._kernel_weights(X_normalized, self.epsilon) """ 2. Calculate the conditional log densities """ n_samples = X.shape[0] conditional_densities = np.zeros(n_samples) for i in range(n_samples): conditional_densities[i] = self._log_density(self.bandwidth, kernel_weights[i, :], Y[i, :]) return conditional_densities def _kernel_weights(self, X_normalized, epsilon): X_dist = norm_along_axis_1(X_normalized, self.X_train, norm_dim=True) mask = X_dist > epsilon num_neighbors = np.sum(np.logical_not(mask), axis=1) # Extra treatment for X that are outside of the epsilon range if np.any(num_neighbors <= _N_POINT_OUT_OF_RANGE): for i in np.nditer(np.where(num_neighbors <= _N_POINT_OUT_OF_RANGE)): # if all points outside of epsilon region - take closest points closest_indices = np.argsort(X_dist[i, :])[:_N_POINT_OUT_OF_RANGE] for j in np.nditer(closest_indices): mask[i,j] = False num_neighbors = np.sum(np.logical_not(mask), axis=1) neighbor_distances = np.ma.masked_where(mask, X_dist) if self.weighted: # neighbors are weighted in proportion to their distance to the query point neighbor_weights = normalize(neighbor_distances.filled(fill_value=0), norm='l1', axis=1) else: # all neighbors are weighted equally with warnings.catch_warnings(): warnings.simplefilter("ignore") # don't print division by zero warning weights = 1 / num_neighbors neighbor_weights = neighbor_distances.copy() neighbor_weights[:] = weights[:, None] neighbor_weights = np.ma.masked_where(mask, neighbor_weights).filled(fill_value=0) return neighbor_weights def _log_density(self, bw, neighbor_weights, y): assert neighbor_weights.shape[0] == self.n_train_points assert y.shape[0] == self.ndim_y log_single_densities = np.array([self._single_log_density(bw, neighbor_weights[i], i, y) for i in np.nditer(np.nonzero(neighbor_weights))]) return logsumexp(log_single_densities) def _single_log_density(self, bw, neighbor_weight, kernel_id, y): return np.log(neighbor_weight) + self.log_kernel((y - self.Y_train[kernel_id, :]) / bw) - np.sum(np.log(bw)) def _param_grid(self): mean_std_y = np.mean(self.y_std) mean_std_x = np.mean(self.x_std) bandwidths = np.asarray([0.2, 0.5, 0.7]) * mean_std_y epsilons = np.asarray([0.05, 0.2, 0.4]) * mean_std_x param_grid = { "bandwidth": bandwidths, "epsilon": epsilons, "weighted": [True, False] } return param_grid def _normalize_x(self, X): X_normalized = (X - self.x_mean) / self.x_std assert X_normalized.shape == X.shape return X_normalized def _normal_reference(self): X_dist = norm_along_axis_1(self.X_train, self.X_train, norm_dim=True) # filter out all points that are not in a epsilon region of x avg_num_neighbors = np.mean(np.ma.masked_where(X_dist > self.epsilon, X_dist).count(axis=1)) - 1 return 1.06 * self.y_std * avg_num_neighbors ** (- 1. / (4 + self.ndim_y)) def _cv_ml(self): bw = self._normal_reference() # use normal_reference as initialization for bandwidth x0 = np.concatenate([bw, np.array([self.epsilon])], axis=0) def loologli(x): assert x.shape[0] == self.ndim_y + 1 bw = x[:self.ndim_y] eps = float(x[self.ndim_y]) return - self.loo_likelihood(bw, epsilon=eps) x_opt = optimize.fmin(loologli, x0=x0, maxiter=_MAX_ITER_CV_ML_OPTIMIZER, disp=0) bw_opt, eps_opt = x_opt[:self.ndim_y], x_opt[self.ndim_y] return bw_opt, eps_opt def __str__(self): return "\nEstimator type: {}\n epsilon: {}\n weighted: {}\n bandwidth: {}\n ".format(self.__class__.__name__, self.epsilon, self.weighted, self.bandwidth) def __unicode__(self): return self.__str__()