Exploring: Variation of Information¶
Background¶
Another way to measure similarity would be in the family of Information Theory Measures (ITMs). Instead of directly measuring first-order output statistics, these methods summarize the information via a probability distribution function (PDF) of the dataset. These can measure non-linear relationships and are naturally multivariate that offers solutions to the shortcomings of the standard methods. I would like to explore this and see if this is a useful way of summarizing information.
Code¶
import numpy as np
import seaborn as sns
import pandas as pd
import statsmodels.api as smi
import sys
sys.path.insert(0, '/home/emmanuel/code/kernel_model_zoo/')
import matplotlib.pyplot as plt
plt.style.use('seaborn-talk')
%matplotlib inline
%load_ext autoreload
%autoreload 2
from kernellib.dependence import HSIC
SAVE_PATH = "/home/emmanuel/projects/2020_rbig_rs/reports/figures/explore/vi/"
Data¶
# load dataset
df_anscombe = sns.load_dataset('anscombe')
df_anscombe.dataset.unique()
def get_case(df: pd.DataFrame, case: str='I'):
return df[df['dataset'] == case]
def plot_cases(df: pd.DataFrame, case: str='I', save=True, plot_type='reg'):
df = get_case(df, case)
plt.figure(figsize=(4,4))
if plot_type == 'reg':
pts = sns.regplot(
x="x",
y="y",
data=df,
)
elif plot_type == 'joint':
pts = sns.jointplot(
x="x",
y="y",
data=df,
kind="regplot",
)
elif plot_type == 'density':
pts = sns.jointplot(
x="x",
y="y",
data=df,
kind="kde",
)
else:
raise ValueError('')
plt.xlabel("")
plt.ylabel("")
plt.xticks([])
plt.yticks([])
# plt.axis('off')
plt.tight_layout()
if save is not None:
plt.savefig(SAVE_PATH + f'demo_case{case}_{plot_type}.png', dpi=200, transparent=True)
return None
plot_cases(df_anscombe, 'III', plot_type='reg')
Mathematics¶
There are a few important quantities to consider when we need to represent the statistics and compare two datasets.
- Variance
- Covariance
- Correlation
- Root Mean Squared
Covariance¶
The covariance is a measure to determine how much two variances change. The covariance between X and Y is given by:
where N is the number of elements in both datasets. Notice how this formula assumes that the number of samples for X and Y are equivalent. This measure is unbounded as it can have a value between -\infty and \infty. Let's look at an example of how to calculate this below.
# covariance formula
def cov(X, Y):
n_samples = X.shape[0]
# get mean
X_mu = X.mean()
Y_mu = Y.mean()
cov_xy = 0
# loop through the data points
for ix in range(n_samples):
cov_xy += (X.values[ix] - X_mu) * (Y.values[ix] - Y_mu)
return cov_xy / n_samples
# extract the data
X = get_case(df_anscombe, 'I')['x']
Y = get_case(df_anscombe, 'I')['y']
# get covariance
cov_xy = cov(X,Y)
print(cov_xy)
X.values[:, None].reshape(-1, 1).shape
Refactor¶
We can remove the loop by doing a matrix multiplication.
where X,Y \in \mathbb{R}^{N\times 1}
np.dot(X[:, None].T-X.mean(), Y[:, None]-Y.mean())/X.shape[0]
# covariance formula
def cov(X, Y):
n_samples = X.shape[0]
# get mean
X_mu = X.mean()
Y_mu = Y.mean()
# remove mean from data
X -= X_mu
Y -= Y_mu
# Ensure 2d
X = np.atleast_2d(X).reshape(-1, 1)
Y = np.atleast_2d(Y).reshape(-1, 1)
# calculate the covariance
cov_xy = X.T @ Y
return (cov_xy / n_samples).item()
def test_anscombe(func, save_name=None):
fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(7,5))
for iax, icase in zip(axs.ravel(), ['I', 'II', 'III', 'IV']):
# data
X = get_case(df_anscombe, icase)['x']
Y = get_case(df_anscombe, icase)['y']
output = func(X.values,Y.values)
iax.scatter(X.values, Y.values, label=f"Case {icase}: $C$={output:.2f}")
iax.legend()
# iax.legend(f"Case: {icase}")
# get covariance
# print(f"Case {icase}: {cov_xy.item()}")
plt.tight_layout()
if save_name is not None:
plt.savefig(SAVE_PATH + f"demo_{save_name}.png")
plt.show()
test_anscombe(cov, 'cov')
Multi-Variate (Multi-Dimensional)¶
np.random.seed(123)
X = np.random.randn(20, 2)
Y = 0.5 * X
# calculate covariance matrix
cov = np.cov(X.squeeze(), Y.squeeze())
print(X.shape, Y.shape, cov.shape)
cov.shape
def cov_hs_features(X, Y):
# calculate covariance matrix
cov_xy = np.cov(X, Y)
# summarize information
cov_sum = np.linalg.norm(cov, ord='fro')
return cov_sum
# ||X.T @ Y||_F - feature space
lhs = np.linalg.norm(X.T @ Y, ord='fro')**2
print(lhs)
# ||XX.T @ YY.T||_F - sample space
mhs = np.trace(X @ X.T @ Y @ Y.T)
print(mhs)
# ||X.T @ Y||_F - feature space
lhs = np.linalg.norm(np.cov(X,Y), ord='fro')**2
print(lhs)
# ||XX.T @ YY.T||_F - sample space
mhs = np.trace(X @ X.T @ Y @ Y.T)
print(mhs)
# RHS
raw = np.trace(X @ Y.T) / np.sqrt(np.trace(X @ X.T) * np.trace(Y @ Y.T))
print(raw)
# MHS
Formula 1¶
\frac{tr(XY^\top}{\sqrt{tr(XX^\top)tr(YY^T)}}
# raw formula
raw = np.trace(X @ Y.T) / np.sqrt(np.trace(X @ X.T) * np.trace(Y @ Y.T))
print(raw)
# numerator
numer1 = np.trace(X @ Y.T)**2
numer2 = np.linalg.norm(X @ Y.T)
print(numer1, numer2)
Formula II¶
\frac{tr(XX\top YY^\top)}{\sqrt{tr(XX^\top XX^\top)tr(YY^\top YY^\top)}}
# formula 2
S = X @ X.T
T = Y @ Y.T
raw = np.trace(S.T @ T) / np.sqrt(np.trace(S.T @ S) * np.trace(T.T @ T))
print(raw)
# numerator
numer1 = np.trace(S.T @ T)
numer2 = np.linalg.norm(S.T @ T)
print(numer1, numer2)
# denominator
denom1 = np.sqrt(np.trace(S.T @ S) * np.trace(T.T @ T))
denom2 = np.sqrt(np.linalg.norm(S.T @ S) * np.linalg.norm(T.T @ T))
print(denom1, denom2)
Proposed¶
\frac{tr(X^\top Y)}{\sqrt{tr(X^\top X)tr(Y^\top Y)}}
# proposed
raw = np.trace(X.T @ Y) / np.sqrt(np.trace(X.T @ X) * np.trace(Y.T @ Y))
print(raw)
# numerator
numer1 = np.trace(X.T @ Y)
numer2 = np.linalg.norm(X.T @ Y)
print(numer1, numer2)
cov_feat_norm = cov_hs_features(X, Y)
print(cov_feat_norm)
X_ft_norm = cov_hs_features(X,X)
Y_ft_norm = cov_hs_features(Y,Y)
corr_feat_norm = cov_feat_norm / (X_ft_norm * Y_ft_norm)
print(corr_feat_norm)
np.inner(np.cov(X,X.T), np.cov(Y,Y.T))
def cov_hs_samples(X, Y):
# calculate samples covariance matrix
K_x = np.cov(X.T)
K_y = np.cov(Y.T)
# summarize
return np.sum(K_x * K_y)
cov_samp_norm = cov_hs_samples(X, Y)
print(cov_samp_norm)
X_samp_norm = cov_hs_samples(X,X)
Y_samp_norm = cov_hs_samples(Y,Y)
corr_samp_norm = cov_samp_norm / np.sqrt(X_samp_norm * Y_samp_norm)
print(corr_samp_norm)
cov_norm = cov_hs_features(X, Y)
print(cov_norm)
def get_linear_hsic(X, Y):
hsic_model = HSIC(kernel='linear', scorer='hsic', bias=True)
hsic_model.fit(X,Y);
hsic_score = hsic_model.score(X)
return hsic_score
def get_linear_cka(X, Y):
hsic_model = HSIC(kernel='linear', scorer='tka')
hsic_model.fit(X,Y);
hsic_score = hsic_model.score(X)
return hsic_score
cka_score = get_linear_cka(X, Y)
print(cka_score)
hsic_score = get_linear_hsic(X, Y)
print(hsic_score)
# Samples Covariance Trace
np.trace(np.cov(X.T) @ np.cov(Y.T))
# Feature Covariance Trace
np.linalg.norm(np.cov(X,Y), ord='fro')
np.linalg.norm(X.T @ Y, ord='fro')
def corr_hs(X, Y):
# calculate summarize covariance matrix
cov_sum = cov_hs(X, Y)
# summarize
X_sum = cov_hs(X, X)
Y_sum = cov_hs(Y, Y)
# calculate correlation
return cov_sum / np.sqrt(X_sum * Y_sum)
corr_sum = corr_hs(X,Y)
print(corr_sum)
# calculate empirical covariance
cov = X.T @ Y
assert cov.shape == (X.shape[1], Y.shape[1])
cov
Correlation¶
This is the normalized version of the covariance measured mentioned above. This is done by dividing the covariance by the product of the standard deviation of the two samples X and Y. So the forumaltion is:
With this normalization, we now have a measure that is bounded between -1 and 1. This makes it much more interpretable and also invariant to isotropic scaling, \rho(X,Y)=\rho(\alpha X, \beta Y) where \alpha, \beta \in \mathbb{R}^{+}
def corr(X, Y):
# get standard deviation
X_std, Y_std = X.std(), Y.std()
# calculate the correlation
cov_xy = cov(X, Y)
# calculate the correlation
return (cov_xy / (X_std * Y_std)).item()
corr_xy = corr(X, Y)
print(corr_xy)
test_anscombe(corr, 'corr')
Root Mean Squared¶
This is a popular measure for measuring the errors between two datasets. More or less, it is a covariance measure that penalizes higher deviations between the datasets.
# covariance formula
def rmse(X, Y):
n_samples = X.shape[0]
# get mean
X_mu = X.mean()
Y_mu = Y.mean()
# remove mean from data
X -= X_mu
Y -= Y_mu
# calculate the squared covariance
cov_xy = np.average((X - Y) ** 2, axis=0)
return np.sqrt((cov_xy))
rmse_xy = rmse(X, Y)
print(rmse_xy)
Refactor¶
The scikit-learn library has a built-in mean_sqared_error function which you can call and then use the np.sqrt on the output.
from sklearn.metrics import mean_squared_error
def rmse(X, Y):
# calculate the squared covariance
rmse_xy = mean_squared_error(X, Y)
return np.sqrt(rmse_xy)
rmse_xy = rmse(X,Y)
print(rmse_xy)
test_anscombe(rmse, 'rmse')
HSIC¶
def get_linear_hsic(X, Y):
hsic_model = HSIC(kernel='linear', scorer='hsic', bias=True)
hsic_model.fit(X[:, None],Y[:, None]);
hsic_score = hsic_model.score(X[:, None])
return hsic_score
hsic_score = get_linear_hsic(X,Y)
print(hsic_score)
test_anscombe(get_linear_hsic, 'hsic_lin')
RBF Kernel¶
def get_rbf_hsic(X, Y):
hsic_model = HSIC(kernel='rbf', scorer='hsic')
hsic_model.fit(X[:, None],Y[:, None]);
hsic_score = hsic_model.score(X[:, None])
return hsic_score
def get_linear_ka(X, Y):
hsic_model = HSIC(kernel='linear', scorer='tka')
hsic_model.fit(X[:, None],Y[:, None]);
hsic_score = hsic_model.score(X[:, None])
return hsic_score
test_anscombe(get_linear_ka, 'cka_lin')
RBF Kernel¶
def get_rbf_ka(X, Y):
hsic_model = HSIC(kernel='rbf', scorer='tka')
hsic_model.fit(X[:, None],Y[:, None]);
hsic_score = hsic_model.score(X[:, None])
return hsic_score
test_anscombe(get_rbf_ka, 'ka_rbf')
Mutual Information¶
In this section, I will be doing the same thing as before except this time I will use the equivalent Information Theory Measures. In principle, they should be better at capturing non-linear relationships and I will be able to add different representations using spatial-temporal information.
Entropy¶
This is the simplest and it is analogous to the standard deviation \sigma. Entropy is defined by
This is the expected amount of uncertainty present in a given distributin function f(X). It captures the amount of surprise within a distribution. So if there are a large number of low probability events, then the expected uncertainty will be higher. Whereas distributions with fairly equally likely events will have low entropy values as there are not many surprise events, e.g. Uniform.
kde = smi.nonparametric.KDEUnivariate(Y)
kde.fit()
print(kde.entropy)
plt.plot(kde.support, kde.density)
import scipy.stats
def entropy(data, method='counts'):
if method == 'counts':
_, pdata = np.unique(data, return_counts=True)
entropy = scipy.stats.entropy(pdata)
elif method == 'kde':
kde = smi.nonparametric.KDEUnivariate(data)
kde.fit()
entropy = kde.entropy
else:
raise ValueError('Unrecognized method.')
return entropy
Hx = entropy(X, 'counts')
Hy = entropy(Y, 'counts')
print(Hx, Hy)
Mutual Information¶
where p(x,y) is the joint probability and p_x(x), p_y(y) are the marginal probabilities of X and Y respectively. We can also express the mutual information as a function of the Entropy H(X)
I(X,Y)=H(X) + H(Y) - H(X,Y)
def mutual_info(X,Y, method='kde'):
Hx = entropy(X, method)
Hy = entropy(Y, method)
Hxy = entropy(np.concatenate((X,Y)), method)
return Hx + Hy - Hxy
Hxy = entropy(pd.concat((X,Y)))
mi_xy = mutual_info(X.values, Y.values)
print(mi_xy)
test_anscombe(mutual_info, 'kde')
def norm_mutual_info(X,Y, method='kde'):
Hx = entropy(X, method)
Hy = entropy(Y, method)
Hxy = entropy(np.concatenate((X,Y)), method)
# mutual information
mi_xy = Hx + Hy - Hxy
return (mi_xy / (np.sqrt(Hx * Hy)))
test_anscombe(norm_mutual_info, 'nkde')
def red_mutual_info(X,Y, method='kde'):
Hx = entropy(X, method)
Hy = entropy(Y, method)
Hxy = entropy(np.concatenate((X,Y)), method)
# mutual information
mi_xy = Hx + Hy - Hxy
return (2 * mi_xy / (Hx + Hy))
test_anscombe(red_mutual_info, 'rkde')
Variation of Information¶
$$ \begin{aligned} VI(X,Y) &= H(X) + H(Y) - 2I(X,Y) \ &= I(X,X) + I(Y,Y) - 2I(X,Y) \end{aligned}$$
def variation_info(X,Y, method='kde'):
Hx = entropy(X, method)
Hy = entropy(Y, method)
Hxy = entropy(np.concatenate((X,Y)), method)
# mutual information
mi_xy = Hx + Hy - Hxy
# variation of information
vi_xy = Hx + Hy - 2 * mi_xy
return vi_xy
test_anscombe(variation_info, 'vikde')
RVI-Based Diagram¶
where The sides are as follows:
- a = \sigma_{\text{obs}} - the entropy of the observed data
- b = \sigma_{\text{sim}} - the entropy of the simulated data
- \rho = \frac{I(X,Y)}{\sqrt{H(X)H(Y)}} - the normalized mutual information
- RMSE - the variation of information between the two datasets
h_a = entropy(X, 'counts')
h_b = entropy(Y, 'kde')
print('H(X),H(Y):',h_a, h_b)
# joint entropy
h_ab = entropy(pd.concat((X,Y)), 'kde')
print('H(X,Y):',h_ab)
# mutual information
mi_ab = h_a + h_b - h_ab
print('MI(X,Y):', mi_ab)
# normalized mutual information
nmi_ab = mi_ab / np.sqrt(h_a * h_b)
print('NMI(X,Y):', nmi_ab)
# scaled mutual info
smi_ab = mi_ab * (h_ab / (h_a * h_b))
print('SMI(X,Y):', smi_ab)
# cos rho term
c_ab = 2 * smi_ab - 1
print('C_XY:', c_ab)
# vi
vi = h_a + h_b - 2 * np.sqrt(h_a * h_b) * nmi_ab
print('VI(X,Y):',vi)
def vi_coeffs(X, Y, method='counts'):
# entropy observations
h_a = entropy(X, method)
# entropy simulated
h_b = entropy(Y, method)
# joint entropy
h_ab = entropy(pd.concat((X,Y)), method)
# mutual information
mi_ab = h_a + h_b - h_ab
# normalized mutual information
nmi_ab = mi_ab / np.sqrt(h_a * h_b)
# scaled mutual information
smi_ab = 2 * mi_ab * (h_ab / (h_a * h_b)) - 1
# vi
vi_ab = h_a + h_b - 2 * np.sqrt((h_a * h_b)) * nmi_ab
# save coefficients
data = {
'h_a': h_a,
'h_b': h_b,
'nmi': nmi_ab,
'smi': smi_ab,
'theta': np.arccos(nmi_ab),
'vi': vi_ab
}
return data
# Model I
X = get_case(df_anscombe, 'I')['x']
Y = get_case(df_anscombe, 'I')['y']
data1 = vi_coeffs(X, Y, 'kde')
print(data1)
# Model II
X = get_case(df_anscombe, 'II')['x']
Y = get_case(df_anscombe, 'II')['y']
data2 = vi_coeffs(X, Y, 'kde')
print(data2)
# Model III
X = get_case(df_anscombe, 'III')['x']
Y = get_case(df_anscombe, 'III')['y']
data3 = vi_coeffs(X, Y, 'kde')
print(data3)
# # Model IV
# X = get_case(df_anscombe, 'IV')['x']
# Y = get_case(df_anscombe, 'IV')['y']
# data4 = vi_coeffs(X, Y)
# print(data4)
import matplotlib.pyplot as plt
import numpy as np
theta = np.linspace(0,np.pi)
r = np.sin(theta)
fig = plt.figure(figsize=(7,5))
ax = fig.add_subplot(111, polar=True)
m = ax.scatter(0, data1['h_a'], s=200, alpha=0.75, label='Data', zorder=0)
m1 = ax.scatter(data1['theta'], data1['h_b'], s=150, alpha=0.75, marker='x', label='Model I')
m1 = ax.scatter(data2['theta'], data2['h_b'], s=150, alpha=0.75, marker='o', label='Model II')
m1 = ax.scatter(data3['theta'], data3['h_b'], s=150, alpha=0.75, marker='.', label='Model III')
# m1 = ax.scatter(theta4, b4, s=100, alpha=0.75, marker='o', label='Model II')
# ax.plot(0)
ax.set_ylim([0, 3])
# ax.set_xticks([0.1, 0.2, 0.3, 0.9])
# ax.set_xticklabels([1.0, 0.9, 0.8, 0.6, 0.3, 0.2, 0.1])
# m1 = ax.scatter(theta1, a, s=50, alpha=0.75)
# m1 = ax.scatter(theta1, a, s=50, alpha=0.75)
c = ax.plot(theta, data1['h_a'] * np.ones(theta.shape), color='black', linestyle='dashed', alpha=0.75)
ax.set_xlabel('Entropy', labelpad=20)
ax.set_ylabel('Entropy', labelpad=20)
plt.legend()
ax.set_thetamin(0)
ax.set_thetamax(90)
plt.tight_layout()
plt.savefig(SAVE_PATH + 'demo_vi.png')
plt.show()