Skip to content

Exploring: Variation of Information

Background

My projects involve trying to compare the outputs of different climate models. There are currently more than 20+ climate models from different companies and each of them try to produce the most accurate prediction of some physical phenomena, e.g. Sea Surface Temperature, Mean Sea Level Pressure, etc. However, it's a difficult task to provide accurate comparison techniques for each of the models. There exist some methods such as the mean and standard deviation. There is also a very common framework of visually summarizing this information in the form of Taylor Diagrams. However, the drawback of using these methods is that they are typically non-linear methods and they cannot handle multidimensional, multivariate data.

Another way to measure similarity would be in the family of Information Theory Measures (ITMs). Instead of directly measuring first-order output statistics, these methods summarize the information via a probability distribution function (PDF) of the dataset. These can measure non-linear relationships and are naturally multivariate that offers solutions to the shortcomings of the standard methods. I would like to explore this and see if this is a useful way of summarizing information.

This is removing the

Code

import numpy as np
import seaborn as sns
import pandas as pd
import statsmodels.api as smi


import matplotlib.pyplot as plt
plt.style.use('seaborn-talk')

%matplotlib inline
%load_ext autoreload
%autoreload 2
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
SAVE_PATH = "/home/emmanuel/projects/2020_rbig_rs/reports/figures/explore/vi/"

Data

We will use the classic dataset for Anscombe's quartet. This is a staple dataset which shows how we need to take care when comparing two datasets. In the example, we will show how visually, two datasets will look similar, but using a correlation measure like the Pearson's coefficient will fail because it is not able to capture the non-linear relationship between the two distributions.

# load dataset
df_anscombe = sns.load_dataset('anscombe')
df_anscombe.dataset.unique()
array(['I', 'II', 'III', 'IV'], dtype=object)
def get_case(df: pd.DataFrame, case: str='I'):
    return df[df['dataset'] == case]
def plot_cases(df: pd.DataFrame, case: str='I', save=True):

    df = get_case(df, case)

    plt.figure(figsize=(4,4))
    pts = sns.jointplot(
        x="x", 
        y="y", 
        data=df, 
        kind="regplot",
    )
    plt.tight_layout()
    if save is not None:
        plt.savefig(SAVE_PATH + f'demo_case{case}.png')
    return None
plot_cases(df_anscombe, 'III')
<Figure size 288x288 with 0 Axes>

This is a very simple case where we have a linear relationship between the datasets. The regression plot above shows a linear line that is fit between the two distributions. We can also see the marginal distributions (the histograms) for X and Y. As you can see, they are definitely similar. But now, we are going to look at a way to summarize this information.

Mathematics

There are a few important quantities to consider when we need to represent the statistics and compare two datasets.

  • Variance
  • Covariance
  • Correlation
  • Root Mean Squared

Covariance

The covariance is a measure to determine how much two variances change. The covariance between X and Y is given by:

C(X,Y)=\frac{1}{N}\sum_{i=1}^N (x_i - \mu_x)(y_i - \mu_i)

where N is the number of elements in both datasets. Notice how this formula assumes that the number of samples for X and Y are equivalent. This measure is unbounded as it can have a value between -\infty and \infty. Let's look at an example of how to calculate this below.

# covariance formula
def cov(X, Y):

    n_samples = X.shape[0]

    # get mean
    X_mu = X.mean()
    Y_mu = Y.mean()

    cov_xy = 0

    # loop through the data points
    for ix in range(n_samples):
        cov_xy += (X.values[ix] - X_mu) * (Y.values[ix] - Y_mu)


    return cov_xy / n_samples
# extract the data
X = get_case(df_anscombe, 'I')['x']
Y = get_case(df_anscombe, 'I')['y']

# get covariance
cov_xy = cov(X,Y)

print(cov_xy)
5.000909090909091
X.values[:, None].reshape(-1, 1).shape
(11, 1)

That number is fairly meaningless now. But we can compare the covariance number of this versus the other cases.

Refactor

We can remove the loop by doing a matrix multiplication.

C(X,Y)=\frac{1}{N} (X-X_\mu)^\top (Y-Y_\mu)

where X,Y \in \mathbb{R}^{N\times 1}

np.dot(X[:, None].T-X.mean(), Y[:, None]-Y.mean())/X.shape[0]

# covariance formula
def cov(X, Y):

    n_samples = X.shape[0]

    # get mean
    X_mu = X.mean()
    Y_mu = Y.mean()

    # remove mean from data
    X -= X_mu
    Y -= Y_mu

    # Ensure 2d
    X = np.atleast_2d(X).reshape(-1, 1)
    Y = np.atleast_2d(Y).reshape(-1, 1)

    # calculate the covariance
    cov_xy = X.T @ Y    
    return (cov_xy / n_samples).item()
def test_anscombe(func, save_name=None):

    fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(7,5))

    for iax, icase in zip(axs.ravel(), ['I', 'II', 'III', 'IV']):

        # data
        X = get_case(df_anscombe, icase)['x']
        Y = get_case(df_anscombe, icase)['y']

        output = func(X.values,Y.values)

        iax.scatter(X.values, Y.values, label=f"Case {icase}: $C$={output:.2f}")
        iax.legend()
    #     iax.legend(f"Case: {icase}")
        # get covariance


    #     print(f"Case {icase}: {cov_xy.item()}")
    plt.tight_layout()

    if save_name is not None:
        plt.savefig(SAVE_PATH + f"demo_{save_name}.png")

    plt.show()
test_anscombe(cov, 'cov')

So, we see that the covariance doesn't seem to change very much between datasets.

Multi-Dimensional

X = np.random.rand(20, 5)
Y = 0.5 * X
# calculate empirical covariance
cov = X.T @ Y
assert cov.shape == (X.shape[1], Y.shape[1])

cov
array([[3.18630117, 2.56365049, 2.57152415, 2.56805358, 2.11704571],
       [2.56365049, 3.02897306, 2.48391556, 2.41456641, 2.52907783],
       [2.57152415, 2.48391556, 3.6830925 , 2.86734156, 2.76159258],
       [2.56805358, 2.41456641, 2.86734156, 3.31376171, 2.55167058],
       [2.11704571, 2.52907783, 2.76159258, 2.55167058, 3.3544363 ]])
np.linalg.eigvals(cov).sum()
16.566564739213316
np.trace(cov)
16.566564739213327

Correlation

This is the normalized version of the covariance measured mentioned above. This is done by dividing the covariance by the product of the standard deviation of the two samples X and Y. So the forumaltion is:

\rho(X, Y) = \frac{C(X,Y)}{\sigma_x \sigma_y}

With this normalization, we now have a measure that is bounded between -1 and 1. This makes it much more interpretable and also invariant to isotropic scaling, \rho(X,Y)=\rho(\alpha X, \beta Y) where \alpha, \beta \in \mathbb{R}^{+}

def corr(X, Y):

    # get standard deviation
    X_std, Y_std = X.std(), Y.std()

    # calculate the correlation
    cov_xy = cov(X, Y)

    # calculate the correlation
    return (cov_xy / (X_std * Y_std)).item()
corr_xy = corr(X, Y)

print(corr_xy)
0.7420788540814529

Now that it is bounded between -1 and 1, this value let's us know that this value is equivalent to being close to 1. So fairly similar.

test_anscombe(corr, 'corr')

So at this point, this is a bit of a red flag. All of the \rho values are the same but we can see very clearly that there are some key differences between the distributions. The covariance nor the correlation measure gave us useful information.

Root Mean Squared

This is a popular measure for measuring the errors between two datasets. More or less, it is a covariance measure that penalizes higher deviations between the datasets.

# covariance formula
def rmse(X, Y):

    n_samples = X.shape[0]

    # get mean
    X_mu = X.mean()
    Y_mu = Y.mean()

    # remove mean from data
    X -= X_mu
    Y -= Y_mu

    # calculate the squared covariance
    cov_xy = np.average((X - Y) ** 2, axis=0)   

    return np.sqrt((cov_xy))
rmse_xy = rmse(X, Y)

print(rmse_xy)
1.9373411958379736

Refactor

The scikit-learn library has a built-in mean_sqared_error function which you can call and then use the np.sqrt on the output.

from sklearn.metrics import mean_squared_error
def rmse(X, Y):
    # calculate the squared covariance
    rmse_xy = mean_squared_error(X, Y)  

    return np.sqrt(rmse_xy)
rmse_xy = rmse(X,Y)
print(rmse_xy)
1.9373411958379736
test_anscombe(rmse, 'rmse')

Again, the same story with a different measurement; no change per dataset.

Taylor Diagram

The Taylor Diagram was a way to summarize the data statistics in a way that was easy to interpret. It used the relationship between the covariance, the correlation and the root mean squared error via the triangle inequality. Assuming we can draw a diagram using the law of cosines;

c^2 = a^2 + b^2 - 2ab \cos \phi

we can write this in terms of \sigma, \rho and RMSE as we have expressed above.

\text{RMSE}^2 = \sigma_{\text{obs}}^2 + \sigma_{\text{sim}}^2 - 2 \sigma_r \sigma_t \rho

The sides are as follows:

  • a = \sigma_{\text{obs}} - the standard deviation of the observed data
  • b = \sigma_{\text{sim}} - the standard deviation of the simulated data
  • \rho - the correlation coefficient
  • RMSE - the root mean squared difference between the two datasets

So let's do a quick example where we calculate these quantities

def taylor_coeffs(X, Y, means=False):

    # subtract means
    if means:
        X = X - X.mean()
        Y = Y - Y.mean()

    # std observations
    a = X.std()

    # std simulated
    b = Y.std()

    # correlation coefficient
    corr_ab = corr(X, Y)

    # RMSE
    rmse_ab = rmse(X, Y)

    # save coefficients
    data = {
        'a': a,
        'b': b,
        'rho': corr_ab,
        'theta': np.arccos(corr_ab),
        'rmse': rmse_ab
    }
    return data
# Model I
X = get_case(df_anscombe, 'I')['x']
Y = get_case(df_anscombe, 'I')['y']
data1 = taylor_coeffs(X, Y)

# Model II
X = get_case(df_anscombe, 'II')['x']
Y = get_case(df_anscombe, 'II')['y']
data2 = taylor_coeffs(X, Y)

# Model III
X = get_case(df_anscombe, 'III')['x']
Y = get_case(df_anscombe, 'III')['y']
data3 = taylor_coeffs(X, Y)

# # Model IV
# X = get_case(df_anscombe, 'IV')['x']
# Y = get_case(df_anscombe, 'IV')['y']
# data4 = taylor_coeffs(X, Y)
np.arccos(0.9)
0.45102681179626236
import matplotlib.pyplot as plt
import numpy as np

theta = np.linspace(0,np.pi)
r = np.sin(theta)

fig = plt.figure(figsize=(7,5))
ax = fig.add_subplot(111, polar=True)
m = ax.scatter(0, data1['a'], s=100, alpha=0.75, label='Data', zorder=0)
m1 = ax.scatter(data1['theta'], data1['b'], s=100, alpha=0.75, marker='x', label='Model I')
m1 = ax.scatter(data2['theta'], data2['b'], s=100, alpha=0.75, marker='+', label='Model II')
m1 = ax.scatter(data3['theta'], data3['b'], s=100, alpha=0.75, marker='.', label='Model III')
# m1 = ax.scatter(data4['theta'], data4['b'], s=100, alpha=0.75, marker='o', label='Model II')
# ax.plot(0)
ax.set_ylim([0, 5])
# ax.set_xticks([0.1, 0.2, 0.3, 0.9])
# ax.set_xticklabels([1.0, 0.9, 0.8, 0.6, 0.3, 0.2, 0.1])
# m1 = ax.scatter(theta1, a, s=50, alpha=0.75)
# m1 = ax.scatter(theta1, a, s=50, alpha=0.75)
c = ax.plot(theta, data1['a'] * np.ones(theta.shape), color='black', linestyle='dashed', alpha=0.75)
ax.set_xlabel('Standard Deviation', labelpad=20)
ax.set_ylabel('Standard Deviation', labelpad=20)
plt.legend()
ax.set_thetamin(0)
ax.set_thetamax(90)
plt.tight_layout()
plt.savefig(SAVE_PATH + 'demo_taylor.png')
plt.show()
data1['b'], data1['rho']
(2.031568135925815, 0.7422004694043999)
fig = plt.figure(figsize=(25,7))
dia = TaylorDiagram(data1['a'], fig, rect=122, label='Model', extend=False)
dia.samplePoints[0].set_color('r')
dia.add_sample(data1['b'], data1['rho'], marker='x', ms=10, ls='-',
                       mfc='k', mec='k', label='Model I')
dia.add_sample(data2['b'], data2['rho'], marker='x', label='Model I')
plt.scatter(theta1, b1, s=100, alpha=0.75, marker='x', label='Model II')
plt.scatter(theta2, b2, s=100, alpha=0.75, marker='+', label='Model II')
plt.scatter(theta3, b3, s=100, alpha=0.75, marker='.', label='Model II')
plt.scatter(theta4, b4, s=100, alpha=0.75, marker='o', label='Model II')
# contours = dia.add_contours(levels=5, colors='0.5')
# plt.clabel(contours, inline=1, fontsize=10, fmt='%.0f')
dia.add_grid()
fig.legend(dia.samplePoints,
               [ p.get_label() for p in dia.samplePoints ],
               numpoints=1, prop=dict(size='small'), loc='upper right')
plt.show()

Mutual Information

In this section, I will be doing the same thing as before except this time I will use the equivalent Information Theory Measures. In principle, they should be better at capturing non-linear relationships and I will be able to add different representations using spatial-temporal information.

Entropy

This is the simplest and it is analogous to the standard deviation \sigma. Entropy is defined by

H(X) = - \int_{X} f(x) \log f(x) dx

This is the expected amount of uncertainty present in a given distributin function f(X). It captures the amount of surprise within a distribution. So if there are a large number of low probability events, then the expected uncertainty will be higher. Whereas distributions with fairly equally likely events will have low entropy values as there are not many surprise events, e.g. Uniform.

kde = smi.nonparametric.KDEUnivariate(Y)
kde.fit()
print(kde.entropy)
plt.plot(kde.support, kde.density)
1.9670005831544313
[<matplotlib.lines.Line2D at 0x7faec83543c8>]
import scipy.stats

def entropy(data, method='counts'):


    if method == 'counts':
        _, pdata = np.unique(data, return_counts=True)
        entropy = scipy.stats.entropy(pdata)
    elif method == 'kde':
        kde = smi.nonparametric.KDEUnivariate(data)
        kde.fit()
        entropy = kde.entropy
    else:
        raise ValueError('Unrecognized method.')
    return entropy
Hx = entropy(X, 'counts')
Hy = entropy(Y, 'counts')

print(Hx, Hy)
2.3978952727983707 2.3978952727983707

Mutual Information

Given two distributions X and Y, we can calculate the mutual information as

I(X,Y) = \int_X\int_Y p(x,y) \log \frac{p(x,y)}{p_x(x)p_y(y)}dxdy

where p(x,y) is the joint probability and p_x(x), p_y(y) are the marginal probabilities of X and Y respectively. We can also express the mutual information as a function of the Entropy H(X)

I(X,Y)=H(X) + H(Y) - H(X,Y)

def mutual_info(X,Y, method='kde'):

    Hx = entropy(X, method)
    Hy = entropy(Y, method)
    Hxy = entropy(np.concatenate((X,Y)), method)
    return Hx + Hy - Hxy
Hxy = entropy(pd.concat((X,Y)))

mi_xy = mutual_info(X.values, Y.values)

print(mi_xy)
2.203120400100416
test_anscombe(mutual_info, 'kde')
/home/emmanuel/.conda/envs/it4dnn/lib/python3.6/site-packages/statsmodels/sandbox/nonparametric/kernels.py:204: RuntimeWarning: divide by zero encountered in double_scalars
  w = 1. / (h * n) * np.sum(self((xs-x)/h), axis=0)
/home/emmanuel/.conda/envs/it4dnn/lib/python3.6/site-packages/statsmodels/sandbox/nonparametric/kernels.py:204: RuntimeWarning: divide by zero encountered in true_divide
  w = 1. / (h * n) * np.sum(self((xs-x)/h), axis=0)
/home/emmanuel/.conda/envs/it4dnn/lib/python3.6/site-packages/statsmodels/sandbox/nonparametric/kernels.py:204: RuntimeWarning: invalid value encountered in multiply
  w = 1. / (h * n) * np.sum(self((xs-x)/h), axis=0)
/home/emmanuel/.conda/envs/it4dnn/lib/python3.6/site-packages/statsmodels/nonparametric/kde.py:232: IntegrationWarning: The occurrence of roundoff error is detected, which prevents 
  the requested tolerance from being achieved.  The error may be 
  underestimated.
  return -integrate.quad(entr, a, b, args=(endog,))[0]
def norm_mutual_info(X,Y, method='kde'):

    Hx = entropy(X, method)
    Hy = entropy(Y, method)
    Hxy = entropy(np.concatenate((X,Y)), method)

    # mutual information
    mi_xy =  Hx + Hy - Hxy
    return (mi_xy / (np.sqrt(Hx * Hy)))
test_anscombe(norm_mutual_info, 'nkde')
def red_mutual_info(X,Y, method='kde'):

    Hx = entropy(X, method)
    Hy = entropy(Y, method)
    Hxy = entropy(np.concatenate((X,Y)), method)

    # mutual information
    mi_xy =  Hx + Hy - Hxy
    return (2 * mi_xy / (Hx + Hy))
test_anscombe(red_mutual_info, 'rkde')

Variation of Information

$$ \begin{aligned} VI(X,Y) &= H(X) + H(Y) - 2I(X,Y) \ &= I(X,X) + I(Y,Y) - 2I(X,Y) \end{aligned}$$

def variation_info(X,Y, method='kde'):

    Hx = entropy(X, method)
    Hy = entropy(Y, method)
    Hxy = entropy(np.concatenate((X,Y)), method)

    # mutual information
    mi_xy =  Hx + Hy - Hxy

    # variation of information
    vi_xy = Hx + Hy - 2 * mi_xy
    return vi_xy
test_anscombe(variation_info, 'vikde')

RVI-Based Diagram

Analagous to the Taylor Diagram, we can summarize the ITMs in a way that was easy to interpret. It used the relationship between the entropy, the mutual information and the normalized mutual information via the triangle inequality. Assuming we can draw a diagram using the law of cosines;

c^2 = a^2 + b^2 - 2ab \cos \phi

we can write this in terms of \sigma, \rho and RMSE as we have expressed above.

\begin{aligned} \text{RVI}^2 &= H(X) + H(Y) - 2 \sqrt{H(X)H(Y)} \frac{I(X,Y)}{\sqrt{H(X)H(Y)}} \\ &= H(X) + H(Y) - 2 \sqrt{H(X)H(Y)} \rho \end{aligned}

where The sides are as follows:

  • a = \sigma_{\text{obs}} - the entropy of the observed data
  • b = \sigma_{\text{sim}} - the entropy of the simulated data
  • \rho = \frac{I(X,Y)}{\sqrt{H(X)H(Y)}} - the normalized mutual information
  • RMSE - the variation of information between the two datasets
h_a = entropy(X, 'counts')
h_b = entropy(Y, 'kde')
print('H(X),H(Y):',h_a, h_b)

# joint entropy
h_ab = entropy(pd.concat((X,Y)), 'kde')
print('H(X,Y):',h_ab)

# mutual information
mi_ab = h_a + h_b - h_ab
print('MI(X,Y):', mi_ab)

# normalized mutual information
nmi_ab = mi_ab / np.sqrt(h_a * h_b)
print('NMI(X,Y):', nmi_ab)

# scaled mutual info
smi_ab = mi_ab * (h_ab / (h_a * h_b))
print('SMI(X,Y):', smi_ab)

# cos rho term
c_ab = 2 * smi_ab - 1
print('C_XY:', c_ab)

# vi
vi = h_a + h_b  - 2 * np.sqrt(h_a * h_b) * nmi_ab

print('VI(X,Y):',vi)
H(X),H(Y): 2.3978952727983707 1.967000583154429
H(X,Y): 2.51573500302577
MI(X,Y): 1.8491608529270298
NMI(X,Y): 0.8514464531077769
SMI(X,Y): 0.986290574938243
C_XY: 0.972581149876486
VI(X,Y): 0.6665741500987403

def vi_coeffs(X, Y, method='counts'):

    # entropy observations
    h_a = entropy(X, method)

    # entropy simulated
    h_b = entropy(Y, method)

    # joint entropy
    h_ab = entropy(pd.concat((X,Y)), method)    

    # mutual information
    mi_ab = h_a + h_b - h_ab

    # normalized mutual information
    nmi_ab = mi_ab / np.sqrt(h_a * h_b)

    # scaled mutual information
    smi_ab = 2 * mi_ab * (h_ab / (h_a * h_b)) - 1

    # vi
    vi_ab = h_a + h_b  - 2 * np.sqrt((h_a * h_b)) * nmi_ab

    # save coefficients
    data = {
        'h_a': h_a,
        'h_b': h_b,
        'nmi': nmi_ab,
        'smi': smi_ab,
        'theta': np.arccos(nmi_ab),
        'vi': vi_ab
    }
    return data
# Model I
X = get_case(df_anscombe, 'I')['x']
Y = get_case(df_anscombe, 'I')['y']
data1 = vi_coeffs(X, Y, 'kde')
print(data1)

# Model II
X = get_case(df_anscombe, 'II')['x']
Y = get_case(df_anscombe, 'II')['y']
data2 = vi_coeffs(X, Y, 'kde')
print(data2)

# Model III
X = get_case(df_anscombe, 'III')['x']
Y = get_case(df_anscombe, 'III')['y']
data3 = vi_coeffs(X, Y, 'kde')
print(data3)

# # Model IV
# X = get_case(df_anscombe, 'IV')['x']
# Y = get_case(df_anscombe, 'IV')['y']
# data4 = vi_coeffs(X, Y)
# print(data4)
{'h_a': 2.7518548199717574, 'h_b': 2.2080793522856723, 'nmi': 0.9751276249886281, 'smi': 1.0224173292681424, 'theta': 0.2235002023733858, 'vi': 0.15251984951237407}
{'h_a': 2.7518548199717574, 'h_b': 2.1269598407974937, 'nmi': 0.9842327930994981, 'smi': 1.0321990011233386, 'theta': 0.1778134760779669, 'vi': 0.11647649545914085}
{'h_a': 2.7518548199717574, 'h_b': 1.967000583154429, 'nmi': 0.9469416711631927, 'smi': 1.0478734393463243, 'theta': 0.32721332665482533, 'vi': 0.31261460292535403}
import matplotlib.pyplot as plt
import numpy as np

theta = np.linspace(0,np.pi)
r = np.sin(theta)

fig = plt.figure(figsize=(7,5))
ax = fig.add_subplot(111, polar=True)
m = ax.scatter(0, data1['h_a'], s=200, alpha=0.75, label='Data', zorder=0)
m1 = ax.scatter(data1['theta'], data1['h_b'], s=150, alpha=0.75, marker='x', label='Model I')
m1 = ax.scatter(data2['theta'], data2['h_b'], s=150, alpha=0.75, marker='o', label='Model II')
m1 = ax.scatter(data3['theta'], data3['h_b'], s=150, alpha=0.75, marker='.', label='Model III')
# m1 = ax.scatter(theta4, b4, s=100, alpha=0.75, marker='o', label='Model II')
# ax.plot(0)
ax.set_ylim([0, 3])
# ax.set_xticks([0.1, 0.2, 0.3, 0.9])
# ax.set_xticklabels([1.0, 0.9, 0.8, 0.6, 0.3, 0.2, 0.1])
# m1 = ax.scatter(theta1, a, s=50, alpha=0.75)
# m1 = ax.scatter(theta1, a, s=50, alpha=0.75)
c = ax.plot(theta, data1['h_a'] * np.ones(theta.shape), color='black', linestyle='dashed', alpha=0.75)
ax.set_xlabel('Entropy', labelpad=20)
ax.set_ylabel('Entropy', labelpad=20)
plt.legend()
ax.set_thetamin(0)
ax.set_thetamax(90)
plt.tight_layout()
plt.savefig(SAVE_PATH + 'demo_vi.png')
plt.show()