Spatial-Temporal Experiment¶
In this notebook, I will be walking through how we can estimate different methods based on the density cubes that we derive.
import sys, os
from pyprojroot import here
root = here(project_files=[".here"])
sys.path.append(str(here()))
# sys.path.insert(0, "/home/emmanuel/code/rbig/")
sys.path.append('/home/emmanuel/code/rbig/')
import pathlib
# standard python packages
import xarray as xr
import pandas as pd
import numpy as np
#
# Experiment Functions
from src.data.esdc import get_dataset
from src.features import Metrics
from sklearn.preprocessing import StandardScaler
# from src.models.density import get_rbig_model
from src.features.temporal import select_period, get_smoke_test_time, TimePeriod
from src.features.spatial import select_region, get_europe, get_spain
from src.experiments.utils import dict_product, run_parallel_step
from src.visualization.spatemp.info_earth.demo_spain import plot_map, plot_monthly_map, plot_ts, plot_ts_error
from src.features import Metrics
from src.features.density import get_density_cubes
from src.features.preprocessing import standardizer_data, get_reference_cube, get_common_indices
from src.models.utils import subset_indices
# # esdc tools
# from src.esdc.subset import select_pixel
# from src.esdc.shape import ShapeFileExtract, rasterize
# from esdc.transform import DensityCubes
from typing import List, Dict
import xarray as xr
from tqdm import tqdm
import time
import cartopy
import cartopy.crs as ccrs
# NUMPY SETTINGS
import numpy as onp
onp.set_printoptions(precision=3, suppress=True)
# MATPLOTLIB Settings
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
# SEABORN SETTINGS
import seaborn as sns
sns.set_context(context='talk',font_scale=0.7)
# sns.set(rc={'figure.figsize': (12, 9.)})
# sns.set_style("whitegrid")
# PANDAS SETTINGS
import pandas as pd
pd.set_option("display.max_rows", 120)
pd.set_option("display.max_columns", 120)
# LOGGING SETTINGS
import sys
import logging
logging.basicConfig(
level=logging.INFO,
stream=sys.stdout,
format='%(asctime)s: %(levelname)s: %(message)s'
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
%load_ext autoreload
%autoreload 2
# !pip install --ignore-installed "git+https://github.com/dcs4cop/xcube.git#egg=xcube" --pre
# !pip install kdepy
Experiment Steps¶
Global Variables¶
Parameters¶
parameters = {}
parameters['variable'] = [
'precipitation',
'gross_primary_productivity',
'land_surface_temperature',
'root_moisture',
'soil_moisture',
'gross_primary_productivity',
'precipitation',
]
parameters['region'] = [None]#[get_europe()]
parameters['period'] = [TimePeriod(name="2010", start="Jan-2010", end="Dec-2010")]
parameters['spatial'] = [1]#[1, 7, 3, 1]
parameters['temporal'] = [6]#[1, 1, 6, 46]
# params = list(dict_product(parameters))
params = list(dict_product(parameters))
print(len(params))
smoke_test = True
params = params[0]
from rbig.layers.rbig_layer import RBIGLayer
from rbig.stopping import InfoLoss
from rbig.transform.gaussianization import MarginalGaussianization
from rbig.transform.uniformization import KDEUniformization, HistogramUniformization
from rbig.transform.linear import OrthogonalTransform
from rbig.models import GaussianizationModel, RBIG
%autoreload 2
# ======================
# experiment - Data
# ======================
# Get DataCube
logging.info(f"Loading '{params['variable']}' variable")
datacube = get_dataset([params['variable']])
datacube = datacube.resample(time='1MS').mean()
# t = clip_dataset_by_geometry(datacube, get_spain())
# datacube = select_region(
# xr_data=datacube, bbox=get_spain()
# )[params['variable']]
# return t
# subset datacube (spatially)
if params['region'] is not None:
logging.info(f"Selecting region '{params['region'].name}'")
datacube = select_region(
xr_data=datacube, bbox=params['region']
)
# subset datacube (temporally)
logging.info(f"Selecting temporal period: '{params['period'].name}'")
datacube = select_period(xr_data=datacube, period=params['period'])
# # get datacubes
# reference_cube_df = get_reference_cube(data=datacube)
# get density cubes
logging.info(f"Getting density cubes: S: {params['spatial']}, T: {params['temporal']}")
density_cube_df = get_density_cubes(
data=datacube[params['variable']],
spatial=params['spatial'],
temporal=params['temporal']
)
logging.info(f"Total data: {density_cube_df.shape}")
# print(density_cube_df.head())
# # get reference dataframe
# X, Y = get_common_indices(
# reference_df=reference_cube_df,
# density_df=density_cube_df
# )
# print(density_cube_df.values.shape)
# print(density_cube_df.columns)
# standardize data
logging.info(f"Standardizing data...")
x_transformer = StandardScaler().fit(density_cube_df.values)
density_cube_df_norm = pd.DataFrame(
data=x_transformer.transform(density_cube_df.values),
columns=density_cube_df.columns.values,
index=density_cube_df.index
)
# # get the probability estimates
# logging.info(f"Getting probability estimates...")
# t0 = time.time()
# X_prob = rbig_model.predict_proba(
# density_cube_df_norm.values, n_trials=20, chunksize=10_000, domain='input'
# )
# t1 = time.time() - t0
# logging.info(f"Time Taken: {t1:.2f} secs")
# X_prob = pd.DataFrame(
# data=X_prob,
# index=density_cube_df_norm.index,
# columns=['probability']
# )
def get_rbig_model():
uniform_clf = HistogramUniformization(bins=50, support_extension=30, alpha=1e-4, n_quantiles=None)
# uniform_clf = KDEUniformization(n_quantiles=50, method='fft', )
mg_gaussianization = MarginalGaussianization(uniform_clf)
orth_transform = OrthogonalTransform('random_o')
rbig_block = RBIGLayer(mg_gaussianization, orth_transform)
rbig_loss = InfoLoss(tol_layers=20, method='histogram', p_value=0.25)
rbig_model = GaussianizationModel(rbig_block, rbig_loss, verbose=True)
return rbig_model
# =========================
# Model - Gaussianization
# =========================
logger.setLevel(logging.INFO)
rbig_model = get_rbig_model()
subsamples = 100_000
subset = subset_indices(density_cube_df_norm.values, subsamples)
logging.info(f"subset: {subset.shape}")
# Gaussianize the data
logging.info(f"Gaussianizing data...")
t0 = time.time()
rbig_model = rbig_model.fit(X=subset)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")
from src.experiments.utils import run_parallel_step
from sklearn.utils import gen_batches
from joblib import delayed, Parallel
import joblib
slices = list(gen_batches(density_cube_df_norm.values.shape[0], 1000))
density_cube_df_norm.values[slices[0]].shape, len(slices)
from dask.distributed import Client
client = Client(processes=False)
client
import dask.dataframe as dd
df_dask = dd.from_pandas(density_cube_df_norm.reset_index(), chunksize=2_000)
df_dask
df_dask[['var_x0', 'var_x1', 'var_x2', 'var_x3', 'var_x4', 'var_x5']].values
# from dask_ml.wrappers import ParallelPostFit
from sklearn.utils import gen_batches
import joblib
from joblib import Parallel, delayed
slices = list(gen_batches(density_cube_df_norm.values.shape[0], 1000))
clf = ParallelPostFit(rbig_model)
import joblib
with joblib.parallel_backend('dask'):
jobs = (delayed(rbig_model.score_samples)(
density_cube_df_norm.values[islice]) for islice in slices
)
parallel = Parallel(verbose=1)
results = parallel(jobs)
from dask.distributed import Client
client = Client("tcp://127.0.0.1:58499")
client
jobs = (delayed(rbig_model.score_samples)(
density_cube_df_norm.values[islice]) for islice in slices
)
parallel = Parallel(n_jobs=8,verbose=2)
results = parallel(jobs)
# get the probability estimates
logging.info(f"Getting probability estimates...")
t0 = time.time()
# subsamples = 500_000
# subset = subset_indices(density_cube_df_norm.values, subsamples)
run_parallel_step(rbig_model.score_samples)
X_prob = rbig_model.score_samples(density_cube_df_norm.values)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")
X_prob = pd.DataFrame(
data=X_prob.squeeze(),
index=density_cube_df_norm.index,
columns=['log_probability']
)
# nll = rbig_model.score(subset.values)
# logging.info(f"Log Likelihood: {nll:.4f}")
def get_information(df):
df = df.reset_index().set_index(['time', 'lat', 'lon'])
# # convert to datetime
# df.index['time'] = pd.to_datetime(df.index['time'])
# remove probabilities greater than 1
# df['probability'][df['probability'] >= 1.0] = np.nan
df['probability'] = np.exp(df['log_probability'])
# create xarray cubes
probs_cubes = xr.Dataset.from_dataframe(df)
# resample monthly
probs_cubes = probs_cubes.resample(time='1MS').mean()
# shannon info
probs_cubes['shannon_info'] = -np.log(probs_cubes['probability']) * np.log(2)
return probs_cubes
probs_cubes = get_information(X_prob)
fig, ax = plt.subplots()
probs_cubes.probability.mean(dim='time').plot(
ax=ax,
vmin=0, robust=True, cmap='Reds',
cbar_kwargs={'label': "Probability"}
)
fig, ax = plt.subplots()
probs_cubes.log_probability.mean(dim='time').plot(
ax=ax, robust=True, cmap='Reds',
cbar_kwargs={'label': "Log Probability"}
)
fig, ax = plt.subplots()
probs_cubes.shannon_info.mean(dim='time').plot(
ax=ax,
vmin=0, robust=True, cmap='Reds',
cbar_kwargs={'label': "Shannon Information",}
)
fig, ax = plt.subplots()
probs_cubes.probability.mean(dim='time').plot(
ax=ax,
vmin=0, robust=True, cmap='Reds',
cbar_kwargs={'label': "Probability"}
)
fig, ax = plt.subplots()
probs_cubes.log_probability.mean(dim='time').plot(
ax=ax, robust=True, cmap='Reds',
cbar_kwargs={'label': "Log Probability"}
)
fig, ax = plt.subplots()
probs_cubes.shannon_info.mean(dim='time').plot(
ax=ax,
vmin=0, robust=True, cmap='Reds',
cbar_kwargs={'label': "Shannon Information",}
)
rbig_model.total_correlation()
fig, ax = plt.subplots()
probs_cubes.probability.mean(dim='time').plot(
ax=ax,
vmin=0, robust=True, cmap='Reds',
cbar_kwargs={'label': "Probability"}
)
fig, ax = plt.subplots()
probs_cubes.shannon_info.mean(dim='time').plot(
ax=ax,
vmin=0, robust=True, cmap='Reds',
cbar_kwargs={'label': "Shannon Information"}
)
!
import rbig
rbig.
# =========================
# Model - Gaussianization
# =========================
# Gaussianize the data
logging.info(f"Gaussianizing data...")
t0 = time.time()
rbig_model = get_rbig_model(X=density_cube_df_norm.values)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")
# get the probability estimates
logging.info(f"Getting probability estimates...")
t0 = time.time()
X_prob = rbig_model.predict_proba(
density_cube_df_norm.values, n_trials=20, chunksize=10_000, domain='input'
)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")
X_prob = pd.DataFrame(
data=X_prob,
index=density_cube_df_norm.index,
columns=['probability']
)
variables = [
'root_moisture',
'soil_moisture',
'gross_primary_productivity',
'land_surface_temperature',
'precipitation',
]
datacube = get_dataset(variables)
datacube
mean_cube = datacube.mean(dim=['lat', 'lon']).sel(time=slice('2002', '2010'))
mean_cube = (mean_cube - mean_cube.mean()) / mean_cube.std()
# mean_cube = datacube.mean(dim=['lat', 'lon'])
# mean_cube = (mean_cube - mean_cube.mean) / mean_cube.std()
fig, ax = plt.subplots()
for ivariable in variables:
mean_cube[ivariable].plot.line(ax=ax, label=ivariable)
ax.legend()
plt.tight_layout()
plt.show()
Experimental Step¶
datacube = get_dataset([params[0]['variable']])
datacube
def step(params: Dict, smoke_test: bool=False):
# ======================
# experiment - Data
# ======================
# Get DataCube
logging.info(f"Loading '{params['variable']}' variable")
datacube = get_dataset([params['variable']])
# t = clip_dataset_by_geometry(datacube, get_spain())
# datacube = select_region(
# xr_data=datacube, bbox=get_spain()
# )[params['variable']]
# return t
# subset datacube (spatially)
logging.info(f"Selecting region '{params['region'].name}'")
datacube = select_region(
xr_data=datacube, bbox=params['region']
)
# subset datacube (temporally)
logging.info(f"Selecting temporal period: '{params['period'].name}'")
datacube = select_period(xr_data=datacube, period=params['period'])
# # get datacubes
# reference_cube_df = get_reference_cube(data=datacube)
# get density cubes
logging.info(f"Getting density cubes: S: {params['spatial']}, T: {params['temporal']}")
density_cube_df = get_density_cubes(
data=datacube[params['variable']],
spatial=params['spatial'],
temporal=params['temporal']
)
logging.info(f"Total data: {density_cube_df.shape}")
# print(density_cube_df.head())
# # get reference dataframe
# X, Y = get_common_indices(
# reference_df=reference_cube_df,
# density_df=density_cube_df
# )
# print(density_cube_df.values.shape)
# print(density_cube_df.columns)
# standardize data
logging.info(f"Standardizing data...")
x_transformer = StandardScaler().fit(density_cube_df.values)
density_cube_df_norm = pd.DataFrame(
data=x_transformer.transform(density_cube_df.values),
columns=density_cube_df.columns.values,
index=density_cube_df.index
)
# =========================
# Model - Gaussianization
# =========================
# Gaussianize the data
logging.info(f"Gaussianizing data...")
t0 = time.time()
rbig_model = get_rbig_model(X=density_cube_df_norm.values)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")
# get the probability estimates
logging.info(f"Getting probability estimates...")
t0 = time.time()
X_prob = rbig_model.predict_proba(
density_cube_df_norm.values, n_trials=20, chunksize=10_000, domain='input'
)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")
X_prob = pd.DataFrame(
data=X_prob,
index=density_cube_df_norm.index,
)
return density_cube_df, rbig_model, x_transformer, X_prob
density_cube_df, rbig_model, X_transformer, X_prob = step(params[0], smoke_test=True)
density_cube_df.head()
plt.hist(density_cube_df['var_x0'].squeeze(), bins=50, density=True);
plt.hist(density_cube_df['prob'].squeeze(), bins=50, range=(0, 1.0));
datacube.mean(dim='time').plot(vmin=0)
plt.tight_layout()
Extra Features¶
So in the previous example, I took data as is. Meaning they were just samples and I used a density estimator. For this example, I will add some extra features and then we can compare how the probability estimates change.
parameters = {}
parameters['variable'] = 'root_moisture'
# 'root_moisture',
# 'soil_moisture',
# 'gross_primary_productivity',
# 'land_surface_temperature',
# 'precipitation',
# ]
parameters['region'] = get_spain()
parameters['period'] = TimePeriod(name="200201_201012", start="Jan-2002", end="Dec-2010")
parameters['spatial'] = 3
parameters['temporal'] = 2
density_cube_df, rbig_model, X_transformer, X_prob = step(parameters, smoke_test=True)
X.head()
X_prob.shape, X_prob.max(), X_prob.min()
plt.hist(X_prob[X_prob < 1.], bins=100)
res
from prefect import task, Flow, Parameter
@task # get Dataset
def get_dataset(variable: str)-> xr.Dataset:
return xr.open_zarr(str(filename))[[variable]]
@task # subset datacube
def cube_spatial_subset(xr_data: xr.Dataset, bbox: Region) -> xr.Dataset:
"""Function to spatially subset an xarray dataset from a bounding box."""
# get bounding box
bbox = shapely.geometry.box(
bbox.lonmin,
bbox.latmin,
bbox.lonmax,
bbox.latmax
)
# subset datacube
return clip_dataset_by_geometry(xr_data, bbox)
@task
def cube_temporal_subset(xr_data: xr.DataArray, period: Tuple[str, str]) -> xr.DataArray:
"""Function to temporally subset an xarray dataset from a tuple of
start date and end date
"""
return xr_data.sel(time=slice(period.start, period.end))
@task # get reference cube
def get_reference_cube(data: xr.DataArray) -> pd.DataFrame:
"""Wrapper Function to get reference cube"""
return data.to_dataframe().dropna().reorder_levels(levels)
@task # get density cubes
def get_density_cubes(data: xr.DataArray, spatial: int, temporal: int) -> pd.DataFrame:
"""Wrapper Function to get density cubes from a dataarray"""
return DensityCubes(
spatial_window=spatial,
time_window=temporal
).get_minicubes(data).reorder_levels(levels)
@task # get common indices
def get_common_indices(
reference_df: pd.DataFrame, density_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
idx = density_df.index.intersection(reference_df.index)
return reference_df.loc[idx,:], density_df.loc[idx, :]
@task # standardize the data before
def standardizer_data(X: pd.DataFrame, Y: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
# standardizer
normalizer = StandardScaler(with_mean=True, with_std=True)
# standardize X values
X_values = normalizer.fit_transform(X.values)
X = pd.DataFrame(data=X_values, index=X.index, columns=X.columns)
# standardize Y Values
Y_values = normalizer.fit_transform(Y.values)
Y = pd.DataFrame(data=Y_values, index=Y.index, columns=Y.columns)
return X, Y
@task
def get_similarity_scores(X_ref: pd.DataFrame, Y_compare: pd.DataFrame) -> Dict:
# RV Coefficient
rv_results = rv_coefficient(X_ref, Y_compare)
# # CKA Coefficient
# cka_results = cka_coefficient(X_ref, Y_compare)
# RBIG Coefficient
rbig_results = rbig_it_measures(X_ref, Y_compare)
results = {
**rv_results,
# **cka_results,
**rbig_results
}
return results
Experiment Run¶
# variable = 'gross_primary_productivity'
# region = get_europe()
# datacube = get_dataset(variable)
# datacube = subset_cube(xr_data=datacube, bbox=region)
logger.setLevel(logging.INFO)
with Flow("Experiment-Step") as flow:
# ======================
# experiment parameters
# ======================
variable = Parameter("variable", default='gross_primary_productivity')
region = Parameter("region", default=get_europe())
period = Parameter("period", default=get_test_time())
spatial = Parameter("spatial", default=1)
temporal = Parameter("temporal", default=3)
# ======================
# experiment - Data
# ======================
# Get DataCube
datacube = get_dataset(variable)
# subset datacube (spatially)
datacube = cube_spatial_subset(xr_data=datacube, bbox=region)[variable]
# subset datacube (temporally)
datacube = cube_temporal_subset(xr_data=datacube, period=period)
# get datacubes
reference_cube_df = get_reference_cube(data=datacube)
# get density cubes
density_cube_df = get_density_cubes(
data=datacube,
spatial=spatial,
temporal=temporal
)
# get reference dataframe
dfs = get_common_indices(
reference_df=reference_cube_df,
density_df=density_cube_df
)
# standardize data
dfs = standardizer_data(X=dfs[0], Y=dfs[1])
# ======================
# experiment - Methods
# ======================
res = get_similarity_scores(X_ref=dfs[0], Y_compare=dfs[1])
state = flow.run()
state.result[res].result