Methods¶
In this notebook, I will be walking through how we can estimate different methods based on the density cubes that we derive.
import sys, os
from pyprojroot import here
root = here(project_files=[".here"])
sys.path.append(str(here()))
import pathlib
import sys, os
from pyprojroot import here
root = here(project_files=[".here"])
sys.path.append(str(here()))
import pathlib
# standard python packages
import xarray as xr
import pandas as pd
import numpy as np
#
# Experiment Functions
from src.data.esdc import get_dataset
from src.features import Metrics
from src.features.temporal import select_period, get_smoke_test_time, TimePeriod
from src.features.spatial import select_region, get_europe, get_spain
from src.models.train_models import get_similarity_scores
from src.experiments.utils import dict_product, run_parallel_step
from src.features import Metrics
from src.features.density import get_density_cubes
from src.features.preprocessing import standardizer_data, get_reference_cube, get_common_indices
from src.models.similarity import cka_coefficient, rv_coefficient, rbig_it_measures
# # esdc tools
# from src.esdc.subset import select_pixel
# from src.esdc.shape import ShapeFileExtract, rasterize
# from esdc.transform import DensityCubes
from typing import List, Dict
import xarray as xr
from tqdm import tqdm
import cartopy
import cartopy.crs as ccrs
# NUMPY SETTINGS
import numpy as onp
onp.set_printoptions(precision=3, suppress=True)
# MATPLOTLIB Settings
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
# SEABORN SETTINGS
import seaborn as sns
sns.set_context(context='talk',font_scale=0.7)
# sns.set(rc={'figure.figsize': (12, 9.)})
# sns.set_style("whitegrid")
# PANDAS SETTINGS
import pandas as pd
pd.set_option("display.max_rows", 120)
pd.set_option("display.max_columns", 120)
# LOGGING SETTINGS
import sys
import logging
logging.basicConfig(
level=logging.INFO,
stream=sys.stdout,
format='%(asctime)s:%(levelname)s:%(message)s'
)
logger = logging.getLogger()
#logger.setLevel(logging.INFO)
%load_ext autoreload
%autoreload 2
1. Get DataCubes¶
!ls /media/disk/databases/ESDC/
# Datapath
DATA_PATH = pathlib.Path("/media/disk/databases/ESDC/")
# get filename
filename = DATA_PATH.joinpath("esdc-8d-0.25deg-1x720x1440-2.0.0.zarr")
from typing import List
import xarray as xr
def get_dataset(variable: str)-> xr.Dataset:
return xr.open_zarr(str(filename))[[variable]]
variable = 'soil_moisture'
datacube = get_dataset(variable)
datacube
2. Select Region¶
For this task, we are going to do something simple: work with only Europe and a segment of Eurasia. I have outlined a region described the latitude and longitude coordintes. With these coordinates, we can subset a section of the cube and continue working with that region only.
from src.features.temporal import select_period
from src.features.spatial import select_region
# get european bounding box
europe_bbox = get_europe()
time_period = ('July-2010', 'July-2010')
# subset region
europe_datacube = subset_cube(datacube, europe_bbox)
# subset region
europe_datacube_201007 = select_period(europe_datacube, time_period)
europe_datacube_201007
europe_datacube_201007.gross_primary_productivity.mean('time').plot(vmin=0, robust=True)
3. Get Density Cubes¶
Now, we are going to create some density cubes. Instead of just taking the entire amount of samples, we are going to actually construct features. These features will be the neighbouring pixels in a spatial-temporal manner. For this demonstration, we will assume that the pixels
from src.features.preprocessing import DensityCubes
def get_density_cubes(data: xr.Dataset, spatial: int, temporal: int) -> Tuple[str, pd.DataFrame]:
"""Wrapper Function to get density cubes from a dataarray"""
for ikey, idata in data.items():
yield ikey, DensityCubes(
spatial_window=spatial,
time_window=temporal
).get_minicubes(idata)
# All samples
europe_df = europe_datacube_201007.to_dataframe().dropna()
# reorder index
levels = ['time', 'lon', 'lat']
europe_df = europe_df.reorder_levels(levels)
# europe_df = europe_df[indx]
europe_df.head()
spatial = 1
temporal = 3
ivar, europe_temp_df = next(get_density_cubes(
europe_datacube_201007,
spatial=spatial,
temporal=temporal
))
levels = ['time', 'lon', 'lat']
europe_temp_df = europe_temp_df.reorder_levels(levels)
europe_temp_df.head()
levels = ['time', 'lon', 'lat']
idx = europe_temp_df.index.intersection(europe_df.index)
idx.shape,
X_df = europe_df.loc[idx,:]
Y_df = europe_temp_df.loc[idx,:]
X_df.shape, Y_df.shape
4.1 Models Framework¶
4.1 Preprocessing¶
4.1.1 - Training and testing¶
europe_df.head()
y = europe_df.iloc[:, 0][:, np.newaxis]
X = europe_df.iloc[:, 1:]
d_dimensions = X.shape[1]
4.1.2 - Train-Test Split¶
from sklearn.model_selection import train_test_split
train_size = 2_000
random_state = 123
xtrain, xtest, ytrain, ytest = train_test_split(
X, y, train_size=train_size, random_state=random_state)
test_size = xtest.shape[0]
4.1.1 - Normalize¶
from sklearn.preprocessing import StandardScaler
# normalize inputs
x_normalizer = StandardScaler(with_mean=True, with_std=False)
xtrain_norm = x_normalizer.fit_transform(xtrain)
xtest_norm = x_normalizer.transform(xtest)
# remove mean outputs
y_normalizer = StandardScaler(with_std=False)
ytrain_norm = y_normalizer.fit_transform(ytrain)
ytest_norm = y_normalizer.transform(ytest)
4.2 - Training¶
# from src.models.gp import SparseGPR
import GPy
from scipy.cluster.vq import kmeans2
# Kernel Function (RBF)
n_dims = xtrain_norm.shape[1]
kernel = GPy.kern.RBF(input_dim=n_dims, ARD=False)
# Inducing Points
n_inducing = 100
z = kmeans2(X, n_inducing, minit="points")[0]
# Initialize GP Model
gp_model = GPy.models.SparseGPRegression(X, y, kernel=kernel, Z=z)
# choose VFE inference method
gp_model.inference_method = (GPy.inference.latent_function_inference.VarDTC())
# fix variance to be low in the beginning
gp_model.Gaussian_noise.variance = 0.01
# optimize GP Model
n_restarts = 0
verbose = 1
max_iters = 1_000
# optimize
gp_model.optimize(
optimizer='scg',
messages=verbose,
max_iters=max_iters,
);
4.3 - Testing¶
ypred = gp_model.predict(xtest_norm, )[0]
ypred.shape, ytest_norm.shape
stats = Metrics().get_all(ypred.squeeze(), ytest_norm.squeeze())
stats
def _predict(model, Xs, batch_size):
ms = []
n = max(len(Xs) / batch_size, 1) # predict in small batches
with tqdm(np.array_split(Xs, n)) as bar:
for xs in bar:
m = model.predict(xs,)
ms.append(m)
return np.vstack(ms)
batch_size = 5_000
ms = []
n = max(len(xtest_norm) / batch_size, 1) # predict in small batches
with tqdm(np.array_split(xtest_norm, n)) as bar:
for xs in bar:
m = sgp_model.predict(xs,)
ms.append(m)
5. Direct Measurements¶
5.1 - \rhoV Coefficient¶
Now, we will explore the easiest linear method. It is the multi-dimensional version of the Pearson Correlation coefficient called the \rhoV-Coefficient (\rho-Vector Coefficient). Most people are familiar with the correlation coefficient:
This is very well-known in the literature but it doesn't directly apply to multi-dimensional data. The final result of the numerator and the denominator is a scalar value per dimension. There is no way we can summarize all of the information into a single scalar. One extension we can do is to create a matrix with the pairwise components (i.e gram matrices) for each of the variables and then take the Frobenius norm (Hilbert-Schmidt norm) of the cross term as well as the individual terms. So the equation is like so:
Note: This is very similar to HSIC and Centered Kernel Alignment (CKA) but this method dates back before. CKA generalizes this method with the addition of distance measures and non-linear kernel functions. We will explore this in the next section.
To code this up, we will all of the components of this equation because we will need them later.
from typing import Dict
from sklearn.preprocessing import KernelCenterer
from sklearn.metrics.pairwise import linear_kernel
def rv_coefficient(X: np.ndarray, Y: np.ndarray) -> Dict:
"""simple function to calculate the rv coefficient"""
# calculate the kernel matrices
X_gram = linear_kernel(X)
Y_gram = linear_kernel(Y)
# center the kernels
X_gram = KernelCenterer().fit_transform(X_gram)
Y_gram = KernelCenterer().fit_transform(Y_gram)
# normalizing coefficients (denomenator)
x_norm = np.linalg.norm(X_gram)
y_norm = np.linalg.norm(Y_gram)
# frobenius norm of the cross terms (numerator)
xy_norm = np.sum(X_gram * Y_gram)
# rv coefficient
pv_coeff = xy_norm / x_norm / y_norm
return {
'coefficient': pv_coeff,
'x_norm': x_norm,
'y_norm': y_norm,
'xy_norm': xy_norm
}
X_samples = europe_temp_df.iloc[:, 0][:, np.newaxis]
Y_samples = europe_temp_df
logging.info(f" Size of X_samples: {X_samples.shape}, {Y_samples.shape}")
d_dimensions = X.shape[1]
# check that the coefficient is 1 if the data is the same
rv_coeff = rv_coefficient(X_samples[:100], X_samples[:100])
np.testing.assert_almost_equal(rv_coeff['coefficient'], 1)
%%time
rv_coeff = rv_coefficient(X_samples[:], Y_samples[:])
rv_coeff
5.2 - Non-Linear Kernel¶
An addition that we can do is to explore how the
from typing import Optional
from scipy.spatial.distance import pdist, squareform
def estimate_sigma(
X: np.ndarray,
method: str='median',
percent: Optional[int]=None,
heuristic: bool=False
) -> float:
# get the squared euclidean distances
if method == 'silverman':
return silvermans_factor(X)
elif method == 'scott':
return scotts_factor(X)
elif percent is not None:
kth_sample = int((percent/100) * X.shape[0])
dists = np.sort(squareform(pdist(X, 'sqeuclidean')))[:, kth_sample]
# print(dists.shape, dists.min(), dists.max())
else:
dists = np.sort(pdist(X, 'sqeuclidean'))
# print(dists.shape, dists.min(), dists.max())
if method == 'median':
sigma = np.median(dists)
elif method == 'mean':
sigma = np.mean(dists)
else:
raise ValueError(f"Unrecognized distance measure: {method}")
if heuristic:
sigma = np.sqrt(sigma / 2)
return sigma
from typing import Dict
from sklearn.preprocessing import KernelCenterer
from sklearn.gaussian_process.kernels import RBF
def cka_coefficient(X: np.ndarray, Y: np.ndarray) -> Dict:
"""simple function to calculate the rv coefficient"""
# estimate sigmas
sigma_X = estimate_sigma(X, method='median', percent=50)
sigma_Y = estimate_sigma(Y, method='median', percent=50)
# calculate the kernel matrices
X_gram = RBF(sigma_X)(X)
Y_gram = RBF(sigma_Y)(Y)
# center the kernels
X_gram = KernelCenterer().fit_transform(X_gram)
Y_gram = KernelCenterer().fit_transform(Y_gram)
# normalizing coefficients (denomenator)
x_norm = np.linalg.norm(X_gram)
y_norm = np.linalg.norm(Y_gram)
# frobenius norm of the cross terms (numerator)
xy_norm = np.sum(X_gram * Y_gram)
# rv coefficient
pv_coeff = xy_norm / x_norm / y_norm
return {
'coefficient': pv_coeff,
'x_norm': x_norm,
'y_norm': y_norm,
'xy_norm': xy_norm
}
# check that the coefficient is 1 if the data is the same
cka_coeff = cka_coefficient(X_samples[:100], X_samples[:100])
np.testing.assert_almost_equal(cka_coeff['coefficient'], 1)
%%time
cka_coeff = cka_coefficient(X_samples[:10_000], Y_samples[:10_000])
cka_coeff
Variation of Information¶
from rbig.rbig import RBIGMI, RBIG
rbig_results = {}
def variation_of_info(H_X, H_Y, I_XY):
return I_XY / np.sqrt(H_X) / np.sqrt(H_Y)
%%time
n_layers = 10000
rotation_type = 'PCA'
random_state = 0
zero_tolerance = 60
pdf_extension = None
pdf_resolution = None
tolerance = None
# Initialize RBIG class
H_rbig_model = RBIG(n_layers=n_layers,
rotation_type=rotation_type,
random_state=random_state,
zero_tolerance=zero_tolerance,
tolerance=tolerance)
# fit model to the data
rbig_results['H_x'] = H_rbig_model.fit(X_samples).entropy(correction=True);
rbig_results['H_y'] = H_rbig_model.fit(Y_samples).entropy(correction=True);
# Initialize RBIG class
I_rbig_model = RBIGMI(n_layers=n_layers,
rotation_type=rotation_type,
random_state=random_state,
zero_tolerance=zero_tolerance,
tolerance=tolerance)
# fit model to the data
rbig_results['I_xy'] = I_rbig_model.fit(X_samples, Y_samples).mutual_information();
# calculate the variation of information coefficient
rbig_results['coefficient'] = variation_of_info(
rbig_results['H_x'],
rbig_results['H_y'],
rbig_results['I_xy']
)
rbig_results