Source code for kde_diffusion.kde2d

"""Kernel density estimation via diffusion for 2-dimensional data."""


########################################
# Dependencies                         #
########################################
from numpy import array, arange
from numpy import exp, sqrt, pi as π
from numpy import ceil, log2
from numpy import ones
from numpy import product, outer
from numpy import histogram2d
from scipy.fft import dctn, idctn
from scipy.optimize import brentq


########################################
# Main                                 #
########################################

[docs]def kde2d(x, y, n=256, limits=None): """ Estimates the 2d density from discrete observations. The input is two lists/arrays `x` and `y` of numbers that represent discrete observations of a random variable with two coordinate components. The observations are binned on a grid of n×n points. `n` will be coerced to the next highest power of two if it isn't one to begin with. Data `limits` may be specified as a tuple of tuples denoting `((xmin, xmax), (ymin, ymax))`. If any of the values are `None`, they will be inferred from the data. Each tuple, or even both of them, may also be replaced by a single value denoting the upper bound of a range centered at zero. After binning, the function determines the optimal bandwidth according to the diffusion-based method. It then smooths the binned data over the grid using a Gaussian kernel with a standard deviation corresponding to that bandwidth. Returns the estimated `density` and the `grid` (along each of the two axes) upon which it was computed, as well as the optimal `bandwidth` values (per axis) that the algorithm determined. Raises `ValueError` if the algorithm did not converge or `x` and `y` are not the same length. """ # Convert to arrays in case lists are passed in. x = array(x) y = array(y) # Make sure numbers of data points are consistent. N = len(x) if len(y) != N: raise ValueError('x and y must have the same length.') # Round up number of bins to next power of two. n = int(2**ceil(log2(n))) # Determine missing data limits. if limits is None: xmin = xmax = ymin = ymax = None elif isinstance(limits, tuple): (xlimits, ylimits) = limits if xlimits is None: xmin = xmax = None elif isinstance(xlimits, tuple): (xmin, xmax) = xlimits else: xmin = -xlimits xmax = +xlimits if ylimits is None: ymin = ymax = None elif isinstance(ylimits, tuple): (ymin, ymax) = ylimits else: ymin = -ylimits ymax = +ylimits else: xmin = -limits xmax = +limits ymin = -limits ymax = +limits if None in (xmin, xmax): delta = x.max() - x.min() if xmin is None: xmin = x.min() - delta/4 if xmax is None: xmax = x.max() + delta/4 if None in (ymin, ymax): delta = y.max() - y.min() if ymin is None: ymin = y.min() - delta/4 if ymax is None: ymax = y.max() + delta/4 Δx = xmax - xmin Δy = ymax - ymin # Bin samples on regular grid. (binned, xedges, yedges) = histogram2d(x, y, bins=n, range=((xmin, xmax), (ymin, ymax))) grid = (xedges[:-1], yedges[:-1]) # Compute discrete cosine transform, then adjust first component. transformed = dctn(binned/N) transformed[0, :] /= 2 transformed[:, 0] /= 2 # Pre-compute squared indices and transform components before solver loop. k = arange(n, dtype='float') # "float" avoids integer overflow. k2 = k**2 a2 = transformed**2 # Define internal functions to be solved iteratively. def γ(t): Σ = ψ(0, 2, t) + ψ(2, 0, t) + 2*ψ(1, 1, t) γ = (2*π*N*Σ)**(-1/3) return (t - γ) / γ def ψ(i, j, t): if i + j <= 4: Σ = abs(ψ(i+1, j, t) + ψ(i, j+1, t)) C = (1 + 1/2**(i+j+1)) / 3 Πi = product(arange(1, 2*i, 2)) Πj = product(arange(1, 2*j, 2)) t = (C*Πi*Πj / (π*N*Σ)) ** (1/(2+i+j)) w = 0.5 * ones(n) w[0] = 1 w = w * exp(-π**2 * k2*t) wx = w * k2**i wy = w * k2**j return (-1)**(i+j) * π**(2*(i+j)) * wy @ a2 @ wx # Solve for optimal diffusion time t*. try: ts = brentq(lambda t: t - γ(t), 0, 0.1) except ValueError: raise ValueError('Bandwidth optimization did not converge.') from None # Calculate diffusion times along x- and y-axis. ψ02 = ψ(0, 2, ts) ψ20 = ψ(2, 0, ts) ψ11 = ψ(1, 1, ts) tx1 = (ψ02**(3/4) / (4*π*N*ψ20**(3/4) * (ψ11 + sqrt(ψ02*ψ20))) )**(1/3) tx2 = (ψ20**(3/4) / (4*π*N*ψ02**(3/4) * (ψ11 + sqrt(ψ02*ψ20))) )**(1/3) # Note: # The above uses the nomenclature from the paper. In the Matlab # reference, tx1 is called t_y, while tx2 is t_x. This is a curious # change in notation. It may be related to the fact that image # coordinates are typically in (y,x) index order, whereas matrices, # such as the binned histogram (in Matlab as much as in Python), # are in (x,y) order. The Matlab code eventually does return # image-like index order, though it never explicitly transposes # the density matrix. That is implicitly handled by its custom # implementation of the inverse transformation (idct2d), which # only employs one matrix transposition, not two as its forward # counterpart (dct2d). # Apply Gaussian filter with optimized kernel. smoothed = transformed * outer(exp(-π**2 * k2 * tx2/2), exp(-π**2 * k2 * tx1/2)) # Reverse transformation after adjusting first component. smoothed[0, :] *= 2 smoothed[:, 0] *= 2 inverse = idctn(smoothed) # Normalize density. density = inverse * n/Δx * n/Δy # Determine bandwidth from diffusion times. bandwidth = array([sqrt(tx2)*Δx, sqrt(tx1)*Δy]) # Return results. return (density, grid, bandwidth)