1.2 - Marginal Gaussianization#
import sys, os
from pyprojroot import here
# spyder up to find the root
root = here(project_files=[".here"])
# append to path
sys.path.append(str(root))
%load_ext autoreload
%autoreload 2
Import Packages#
from typing import Union
import numpy as np
from sklearn.datasets import make_moons
from scipy import stats
# Packages
import numpy as np
import pytorch_lightning as pl
# Plot
import matplotlib.pyplot as plt
import corner
import wandb
pl.seed_everything(1234)
%load_ext autoreload
%autoreload 2
Global seed set to 1234
The autoreload extension is already loaded. To reload it, use:
%reload_ext autoreload
Logging#
# TODO
Data#
ntrain = 100_000
ntest = 10_000
noise = 0.1
random_state = 123
train_data = make_moons(n_samples=ntrain, noise=noise, random_state=random_state)[0]
test_data = make_moons(n_samples=ntest, noise=noise, random_state=random_state*10)[0]
Plot Data#
fig = plt.figure(figsize=(7, 7))
corner.corner(train_data, color="blue", fig=fig)
plt.show()
data:image/s3,"s3://crabby-images/5003c/5003c1e796a61eb50b15091daac5504ce3548d94" alt="../../../_images/0631919fa78747183aa5c99b39ae6feae11d9b62af81b18e916f065532b67fc3.png"
Flow Transformation#
class Bijector:
def forward(self, X):
raise NotImplemented
def inverse(self, X):
raise NotImplemented
def gradient(self, X):
raise NotImplemented
Marginal Gaussianization#
Marginal Uniformization#
class MarginalHistogramUniformization:
def __init__(self, X: np.ndarray, bins: Union[int,str]="auto", alpha: float=1e-10, bound_ext: float=0.1):
estimators = []
for iX in X.T:
diff = iX.max() - iX.min()
lower_bound = iX.min() - bound_ext * diff
upper_bound = iX.max() + bound_ext * diff
# create histogram
hist = np.histogram(iX, bins=bins, range=(lower_bound, upper_bound))
# create histogram object
i_estimator = stats.rv_histogram(hist)
# add some regularization
i_estimator._hpdf += alpha
estimators.append(i_estimator)
self.estimators = estimators
def forward(self, X):
Z = np.zeros_like(X)
for idim, iX in enumerate(X.T):
Z[:, idim] = self.estimators[idim].cdf(iX)
return Z
def inverse(self, Z):
X = np.zeros_like(Z)
for idim, iZ in enumerate(Z.T):
X[:, idim] = self.estimators[idim].ppf(iZ)
return X
def gradient(self, X):
X_grad = np.zeros_like(X)
for idim, iX in enumerate(X.T):
X_grad[:, idim] = self.estimators[idim].pdf(iX)
return X_grad
fig = corner.corner(train_data)
data:image/s3,"s3://crabby-images/ab5ba/ab5baf40a626fda4bd692d34240b16a9b8475c13" alt="../../../_images/f21e0ce4504e1df4723067ea433c70c533a9ab8afebfe0f0b8c1fccd0cb83888.png"
from rbig._src.uniform import MarginalHistogramUniformization
marg_hist_bijector = MarginalHistogramUniformization(X=train_data)
X_u = marg_hist_bijector.forward(train_data)
fig = corner.corner(X_u)
data:image/s3,"s3://crabby-images/f7fbe/f7fbe253b4be3107aabe373155e42b3379fc63f3" alt="../../../_images/0bf084fd371c290022b1f27bc9a7babdf7a9a2f886f872bd0d36848b3019ba3b.png"
Inverse Transforms#
X_approx = marg_hist_bijector.inverse(X_u)
fig = corner.corner(X_approx)
data:image/s3,"s3://crabby-images/ab5ba/ab5baf40a626fda4bd692d34240b16a9b8475c13" alt="../../../_images/f21e0ce4504e1df4723067ea433c70c533a9ab8afebfe0f0b8c1fccd0cb83888.png"
Inverse Gaussian CDF#
from rbig._src.invcdf import InverseGaussCDF
invcdf = InverseGaussCDF(eps=1e-5)
Forward#
X_g = invcdf.forward(X_u)
fig = corner.corner(X_g)
data:image/s3,"s3://crabby-images/60483/60483defb8a6ae10b5a2daa8b4c050f8f767748d" alt="../../../_images/8a7292a22b722e12b0a96e9f59a0025c2b04954afd6ada26581ec5b99e07ecbc.png"
Inverse#
X_u_approx = invcdf.inverse(X_g)
fig = corner.corner(X_u_approx)
data:image/s3,"s3://crabby-images/6812b/6812bd96fc9d5cc17bbd4308a0cdf3e672eff57c" alt="../../../_images/331572c2029db1da7f6f30952354a82704c8919a22e8441389bc6641a2bf8b2b.png"
Jacobian#
X_g_grad = invcdf.gradient(X_u)
fig = corner.corner(X_g_grad)
data:image/s3,"s3://crabby-images/fa98b/fa98b6bf83d7d149bec93be47d854599499a3b3c" alt="../../../_images/bf256dd827ce8b9806e23b74e9c775617bb9d0c35303a066e24ed8af606407d7.png"
Composing Transfirmations#
from rbig._src.base import CompositeBijector
bijectors = [marg_hist_bijector, invcdf]
marg_gauss_bijector = CompositeBijector(bijectors)
X_g = marg_gauss_bijector.forward(train_data)
fig = corner.corner(X_g)
data:image/s3,"s3://crabby-images/60483/60483defb8a6ae10b5a2daa8b4c050f8f767748d" alt="../../../_images/8a7292a22b722e12b0a96e9f59a0025c2b04954afd6ada26581ec5b99e07ecbc.png"
X_approx = marg_gauss_bijector.inverse(X_g)
fig = corner.corner(X_approx)
data:image/s3,"s3://crabby-images/57f6a/57f6a7353d5487b33a565b1156177b54c7841fef" alt="../../../_images/af4fb33468d5c9f39c36453c55c8b7933d4758e1f6472bc0942d1cda9a1a152a.png"
Jacobian#
X_ldj = marg_gauss_bijector.gradient(train_data)
fig = corner.corner(X_ldj)
data:image/s3,"s3://crabby-images/25196/25196082bdd3bececf558581c283dc11c9a0ffc6" alt="../../../_images/137d1405ea6c41da8de55db35344e53e047e5ac34197f7019026d3526ff1ee19.png"
Density Estimation#
# 1) latent prob
Xg = marg_gauss_bijector.forward(train_data)
# latent probability
base_dist = stats.norm(loc=0, scale=1)
pz = base_dist.pdf(Xg).sum(axis=1)
# 2) prob
X_detjacobian = marg_gauss_bijector.gradient(train_data)
# total prob
p_x = pz * X_detjacobian
fig = corner.corner(p_x)
data:image/s3,"s3://crabby-images/bcc02/bcc02271baaed9901db590be6fbc0f0a94cec00a" alt="../../../_images/7108ded9e5a302156c0833df29b2b71a00605ce9893c6e9aea4b52b2ee57be66.png"
Sampling#
# 1) samples from latent probability
base_dist = stats.norm(loc=0, scale=1)
pz_samples = np.vstack([base_dist.rvs(size=10_000, random_state=123), base_dist.rvs(size=10_000, random_state=42)]).T
fig = corner.corner(pz_samples)
data:image/s3,"s3://crabby-images/b1c45/b1c4571afb5e0ad8995bddba94dbf900cc727665" alt="../../../_images/f701d130f80c7818c6e958ce528e30697115aa335728198aa794acb5c8b2577a.png"
X_samples = marg_gauss_bijector.inverse(pz_samples)
fig = corner.corner(X_samples)
data:image/s3,"s3://crabby-images/7606f/7606ffa7a9485254820ce767b9179cddd15b8ae8" alt="../../../_images/e1f0085e589f9c0de7bf8829049c68b0709986a615c348735c7d0547b897a0ee.png"