Toy Example¶
In this toy example, we will go through the motivation for using Information theory measures as opposed to simple correlation measures.
import sys, os
from pyprojroot import here
root = here(project_files=[".here"])
sys.path.append(str(here()))
import pathlib
FIG_PATH = pathlib.Path(str(here())).joinpath('reports/figures/spa_temp/demos')
# standard python packages
import xarray as xr
import pandas as pd
import numpy as np
#
# Experiment Functions
from src.data.esdc import get_dataset
from src.features import Metrics
from src.features.temporal import select_period, get_smoke_test_time
from src.features.spatial import select_region, get_europe
from src.models.train_models import get_similarity_scores
from src.experiments.utils import dict_product, run_parallel_step
from src.features import Metrics
from src.features.density import get_density_cubes
from src.features.preprocessing import standardizer_data, get_reference_cube, get_common_indices
from src.models.similarity import cka_coefficient, rv_coefficient, rbig_it_measures
# # esdc tools
# from src.esdc.subset import select_pixel
# from src.esdc.shape import ShapeFileExtract, rasterize
# from esdc.transform import DensityCubes
from typing import List, Dict
import xarray as xr
from scipy import stats
from tqdm import tqdm
import cartopy
import cartopy.crs as ccrs
# NUMPY SETTINGS
import numpy as onp
onp.set_printoptions(precision=3, suppress=True)
# MATPLOTLIB Settings
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
# SEABORN SETTINGS
import seaborn as sns
sns.set_context(context='talk',font_scale=0.7)
# sns.set(rc={'figure.figsize': (12, 9.)})
# sns.set_style("whitegrid")
# PANDAS SETTINGS
import pandas as pd
pd.set_option("display.max_rows", 120)
pd.set_option("display.max_columns", 120)
# LOGGING SETTINGS
import sys
import logging
logging.basicConfig(
level=logging.INFO,
stream=sys.stdout,
format='%(asctime)s:%(levelname)s:%(message)s'
)
logger = logging.getLogger()
#logger.setLevel(logging.INFO)
%load_ext autoreload
%autoreload 2
Data¶
For the toy example, we will show a dataset with two features \mathbf{x} and a corresponding output y. One feature will be linearly correlated and the other will have a non-linear correlation.
# domain = np
def f1(x): return x**2
def f2(x): return .5 + 0.5 * (x + 0.3 * np.random.randn(x.shape[0]))
x_dist = stats.uniform(loc=-1.0, scale=2.0)
x_samples = x_dist.rvs(100)
y1 = f1(x_samples)
y2 = f2(x_samples)
fig, ax = plt.subplots()
ax.scatter(x_samples, y1)
plt.show()
fig, ax = plt.subplots()
ax.scatter(x_samples, y2)
plt.show()
# lets generate some extra samples
x_samples = x_dist.rvs(1000)
y1 = f1(x_samples)
y2 = f2(x_samples)
results_y1 = get_similarity_scores(y1[:, None], x_samples[:, None])
results_y1
results_y2 = get_similarity_scores(y2[:, None], x_samples[:, None])
results_y2
results_y1 = get_similarity_scores(np.vstack([x_samples, x_samples]).T,
np.vstack([y1, y2]).T, )
results_y1
Method I - Pearson / RV¶
Now, we will explore the easiest linear method. It is the multi-dimensional version of the Pearson Correlation coefficient called the \rhoV-Coefficient (\rho-Vector Coefficient). Most people are familiar with the correlation coefficient:
This is very well-known in the literature but it doesn't directly apply to multi-dimensional data. The final result of the numerator and the denominator is a scalar value per dimension. There is no way we can summarize all of the information into a single scalar. One extension we can do is to create a matrix with the pairwise components (i.e gram matrices) for each of the variables and then take the Frobenius norm (Hilbert-Schmidt norm) of the cross term as well as the individual terms. So the equation is like so:
Note: This is very similar to HSIC and Centered Kernel Alignment (CKA) but this method dates back before. CKA generalizes this method with the addition of distance measures and non-linear kernel functions. We will explore this in the next section.
To code this up, we will all of the components of this equation because we will need them later.
from typing import Dict
from sklearn.preprocessing import KernelCenterer
from sklearn.metrics.pairwise import linear_kernel
def rv_coefficient_(X: np.ndarray, Y: np.ndarray) -> Dict:
"""simple function to calculate the rv coefficient"""
print(X.shape, Y.shape)
# calculate the kernel matrices
X_gram = linear_kernel(X)
Y_gram = linear_kernel(Y)
# center the kernels
X_gramc = KernelCenterer().fit_transform(X_gram)
Y_gramc = KernelCenterer().fit_transform(Y_gram)
# normalizing coefficients (denomenator)
x_norm = np.linalg.norm(X_gramc)
y_norm = np.linalg.norm(Y_gramc)
# frobenius norm of the cross terms (numerator)
xy_norm = np.einsum("ij,ij->", X_gramc, Y_gramc)
# rv coefficient
pv_coeff = xy_norm / x_norm / y_norm
return {
"coefficient": pv_coeff,
"x_norm": x_norm,
"y_norm": y_norm,
"xy_norm": xy_norm,
}
feat_ = rv_coefficient_(x_samples[:, None], y2[:, None])
# print(feat1_coeff)
from src.models.similarity import rv_coefficient
feat1_coeff = rv_coefficient(x_samples[:, None], y2[:, None])
feat1_coeff
# correlation for feature 1
np.testing.assert_array_equal(feat_['X_gram'], feat1_coeff['X_gram'])
feat1_coeff = rv_coefficient(x_samples[:, None], y1[:, None])
print(feat1_coeff['coefficient'])
# correlation for feature 2
feat2_coeff = rv_coefficient_(x_samples[:, None], y2[:, None])
print(feat2_coeff['coefficient'])
# combined correlation
feat3_coeff = rv_coefficient_(
np.vstack([x_samples, x_samples]).T,
np.vstack([y1, y2]).T,
)
print(feat3_coeff['coefficient'])
Method II - PDF Estimation¶
# lets generate some extra samples
x_samples = x_dist.rvs(1000)
y1 = f1(x_samples)
y2 = f2(x_samples)
it_results = {}
it_results['h_x'] = stats.rv_histogram(np.histogram(x_samples)).entropy()
it_results['h_y1'] = stats.rv_histogram(np.histogram(y1)).entropy()
it_results['h_y2'] = stats.rv_histogram(np.histogram(y2)).entropy()
it_results
mutual_info_score(y1, x_samples), mutual_info_score(y2, x_samples)
from src.models.similarity import rbig_it_measures
from sklearn.metrics import mutual_info_score
res = rbig_it_measures(x_samples[:, None], y1[:, None])
res
res = rbig_it_measures(x_samples[:, None], y2[:, None])
res
res = rbig_it_measures( np.vstack([x_samples, x_samples]).T,
np.vstack([y1, y2]).T, )
res
%%time
from rbig.rbig import RBIGMI, RBIG
rbig_results = {}
n_layers = 10000
rotation_type = 'PCA'
random_state = 0
zero_tolerance = 60
pdf_extension = None
pdf_resolution = None
tolerance = None
# Initialize RBIG class
H_rbig_model = RBIG(n_layers=n_layers,
rotation_type=rotation_type,
random_state=random_state,
zero_tolerance=zero_tolerance,
tolerance=tolerance)
# fit model to the data
rbig_results['H_x'] = H_rbig_model.fit(x_samples[:, None]).entropy(correction=True);
# fit model to the data
rbig_results['H_y1'] = H_rbig_model.fit(y1[:, None]).entropy(correction=True);
rbig_results['H_y2'] = H_rbig_model.fit(y2[:, None]).entropy(correction=True);
# # calculate the variation of information coefficient
# rbig_results['coefficient'] = variation_of_info(
# rbig_results['H_x'],
# rbig_results['H_y'],
# rbig_results['I_xy']
# )
rbig_results
# Initialize RBIG class
n_layers = 10000
rotation_type = "PCA"
random_state = 0
zero_tolerance = 60
pdf_extension = 10
I_rbig_model = RBIGMI(
n_layers=n_layers,
rotation_type=rotation_type,
random_state=random_state,
pdf_extension=pdf_extension,
zero_tolerance=zero_tolerance,
)
# fit model to the data
rbig_results['I_xy1'] = I_rbig_model.fit(x_samples[:, None], y1[:, None]).mutual_information();
rbig_results['I_xy2'] = I_rbig_model.fit(x_samples[:, None], y2[:, None]).mutual_information();
rbig_results
# correlation for feature 1
feat1_coeff = rv_coefficient(x[:, None], y1[:, None])
print(feat1_coeff['coefficient'])
# correlation for feature 2
feat2_coeff = rv_coefficient(x[:, None], y2[:, None])
print(feat2_coeff['coefficient'])
from src.models.similarity import cka_coefficient, estimate_sigma
from sklearn.gaussian_process.kernels import RBF
sigma_X = estimate_sigma(x_samples[:, None], percent=50)
sigma_Y = estimate_sigma(y1[:, None], percent=50)
print(sigma_X, sigma_Y)
# calculate the kernel matrices
X_gram = RBF(sigma_X)(x_samples[:, None])
Y_gram = RBF(sigma_Y)(y1[:, None])
# center the kernels
X_gram = KernelCenterer().fit_transform(X_gram)
Y_gram = KernelCenterer().fit_transform(Y_gram)
# normalizing coefficients (denomenator)
x_norm = np.linalg.norm(X_gram)
y_norm = np.linalg.norm(Y_gram)
print(x_norm, y_norm)
# frobenius norm of the cross terms (numerator)
xy_norm = np.sum(X_gram * Y_gram)
print(xy_norm)
# rv coefficient
pv_coeff = xy_norm / x_norm / y_norm
print(pv_coeff)
res_cka1 = cka_coefficient(x_samples[:, None], y1[:, None])
res_cka1
res_cka1 = cka_coefficient(x_samples[:, None], y2[:, None])
res_cka1
npz = np.load('partitions_cd.npz', allow_pickle=True)
df = pd.DataFrame.from_dict({item:[npz[item]] for item in npz.files}, orient='columns')
# df = pd.DataFrame({'norm_rates': df['norm_rates'][0]}, index=[0])
df.head()
df['y_clust_centers'].values[0][0].shape
subdf = pd.DataFrame(df['range_n_clusters'].values[0])
subdf.to_csv('range_n_clusters.csv', index=False)
subdf.shape
npz = np.load('partitions_cd.npz', allow_pickle=True)
df = [(item, pd.DataFrame.from_dict({item:[npz[item]]}, orient='columns')) for item in npz.files]
for item, frame in df:
frame.to_csv(f"{item}.csv")
df.to_csv('partitions_cd.csv')
pd.read_csv('norm_variables.csv')['norm_variables'][0]
df2 = pd.read_csv('partitions_cd.csv')
df2.head()
pd.testing.assert_frame_equal(df, df2)