Skip to content

Spatial-Temporal Experiment

In this notebook, I will be walking through how we can estimate different methods based on the density cubes that we derive.

import sys, os
from pyprojroot import here
root = here(project_files=[".here"])
sys.path.append(str(here()))
# sys.path.insert(0, "/home/emmanuel/code/rbig/")
sys.path.append('/home/emmanuel/code/rbig/')

import pathlib

# standard python packages
import xarray as xr
import pandas as pd
import numpy as np

# 
# Experiment Functions
from src.data.esdc import get_dataset
from src.features import Metrics
from sklearn.preprocessing import StandardScaler
# from src.models.density import get_rbig_model
from src.features.temporal import select_period, get_smoke_test_time, TimePeriod
from src.features.spatial import select_region, get_europe, get_spain
from src.experiments.utils import dict_product, run_parallel_step
from src.visualization.spatemp.info_earth.demo_spain import plot_map, plot_monthly_map, plot_ts, plot_ts_error

from src.features import Metrics
from src.features.density import get_density_cubes
from src.features.preprocessing import standardizer_data, get_reference_cube, get_common_indices
from src.models.utils import subset_indices
# # esdc tools
# from src.esdc.subset import select_pixel
# from src.esdc.shape import ShapeFileExtract, rasterize
# from esdc.transform import DensityCubes

from typing import List, Dict
import xarray as xr

from tqdm import tqdm
import time

import cartopy
import cartopy.crs as ccrs

# NUMPY SETTINGS
import numpy as onp
onp.set_printoptions(precision=3, suppress=True)

# MATPLOTLIB Settings
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

# SEABORN SETTINGS
import seaborn as sns
sns.set_context(context='talk',font_scale=0.7)
# sns.set(rc={'figure.figsize': (12, 9.)})
# sns.set_style("whitegrid")

# PANDAS SETTINGS
import pandas as pd
pd.set_option("display.max_rows", 120)
pd.set_option("display.max_columns", 120)

# LOGGING SETTINGS
import sys
import logging
logging.basicConfig(
    level=logging.INFO, 
    stream=sys.stdout,
    format='%(asctime)s: %(levelname)s: %(message)s'
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

%load_ext autoreload
%autoreload 2
# !pip install --ignore-installed "git+https://github.com/dcs4cop/xcube.git#egg=xcube" --pre
# !pip install kdepy

Experiment Steps

Global Variables

Parameters

parameters = {}
parameters['variable'] = [
    'precipitation',
    'gross_primary_productivity',
    'land_surface_temperature',
    'root_moisture',
    'soil_moisture',
    'gross_primary_productivity',
    'precipitation',
]
parameters['region'] = [None]#[get_europe()]
parameters['period'] = [TimePeriod(name="2010", start="Jan-2010", end="Dec-2010")]
parameters['spatial'] = [1]#[1, 7, 3, 1]
parameters['temporal'] = [6]#[1, 1, 6, 46]

# params = list(dict_product(parameters))

params = list(dict_product(parameters))
print(len(params))
smoke_test = True

params = params[0]
7
from rbig.layers.rbig_layer import RBIGLayer
from rbig.stopping import InfoLoss
from rbig.transform.gaussianization import MarginalGaussianization
from rbig.transform.uniformization import KDEUniformization, HistogramUniformization
from rbig.transform.linear import OrthogonalTransform
from rbig.models import GaussianizationModel, RBIG

%autoreload 2
# ======================
# experiment - Data
# ======================
# Get DataCube
logging.info(f"Loading '{params['variable']}' variable")
datacube = get_dataset([params['variable']])
datacube = datacube.resample(time='1MS').mean()

#     t = clip_dataset_by_geometry(datacube, get_spain())

#     datacube = select_region(
#         xr_data=datacube, bbox=get_spain()
#     )[params['variable']]
#     return t

# subset datacube (spatially)
if params['region'] is not None:
    logging.info(f"Selecting region '{params['region'].name}'")
    datacube = select_region(
        xr_data=datacube, bbox=params['region']
    )

# subset datacube (temporally)
logging.info(f"Selecting temporal period: '{params['period'].name}'")
datacube = select_period(xr_data=datacube, period=params['period'])

#     # get datacubes
#     reference_cube_df = get_reference_cube(data=datacube)

# get density cubes
logging.info(f"Getting density cubes: S: {params['spatial']}, T: {params['temporal']}")
density_cube_df = get_density_cubes(
    data=datacube[params['variable']], 
    spatial=params['spatial'], 
    temporal=params['temporal']
)
logging.info(f"Total data: {density_cube_df.shape}")
#     print(density_cube_df.head())

#     # get reference dataframe
#     X, Y = get_common_indices(
#         reference_df=reference_cube_df, 
#         density_df=density_cube_df
#     )
#     print(density_cube_df.values.shape)
#     print(density_cube_df.columns)
# standardize data
logging.info(f"Standardizing data...")
x_transformer = StandardScaler().fit(density_cube_df.values)

density_cube_df_norm = pd.DataFrame(
    data=x_transformer.transform(density_cube_df.values),
    columns=density_cube_df.columns.values,
    index=density_cube_df.index
)


# # get the probability estimates
# logging.info(f"Getting probability estimates...")
# t0 = time.time()
# X_prob = rbig_model.predict_proba(
#     density_cube_df_norm.values, n_trials=20, chunksize=10_000, domain='input'
# )
# t1 = time.time() - t0
# logging.info(f"Time Taken: {t1:.2f} secs")

# X_prob = pd.DataFrame(
#     data=X_prob, 
#     index=density_cube_df_norm.index,
#     columns=['probability']
# )
2020-05-12 23:24:19,438: INFO: Loading 'precipitation' variable
2020-05-12 23:24:22,675: INFO: Selecting temporal period: '2010'
2020-05-12 23:24:22,682: INFO: Getting density cubes: S: 1, T: 6
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/dask/array/numpy_compat.py:40: RuntimeWarning: invalid value encountered in true_divide
  x = np.divide(x1, x2, out)
2020-05-12 23:24:23,606: INFO: Total data: (2605212, 6)
2020-05-12 23:24:23,607: INFO: Standardizing data...
def get_rbig_model():
    uniform_clf = HistogramUniformization(bins=50, support_extension=30, alpha=1e-4, n_quantiles=None)
#     uniform_clf = KDEUniformization(n_quantiles=50, method='fft', )
    mg_gaussianization = MarginalGaussianization(uniform_clf)
    orth_transform = OrthogonalTransform('random_o')
    rbig_block = RBIGLayer(mg_gaussianization, orth_transform)
    rbig_loss = InfoLoss(tol_layers=20, method='histogram', p_value=0.25)
    rbig_model = GaussianizationModel(rbig_block, rbig_loss, verbose=True)
    return rbig_model
# =========================
# Model - Gaussianization
# =========================
logger.setLevel(logging.INFO)
rbig_model = get_rbig_model()

subsamples = 100_000
subset = subset_indices(density_cube_df_norm.values, subsamples)
logging.info(f"subset: {subset.shape}")
# Gaussianize the data
logging.info(f"Gaussianizing data...")
t0 = time.time()
rbig_model = rbig_model.fit(X=subset)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")
2020-05-12 23:24:26,528: INFO: subset: (100000, 6)
2020-05-12 23:24:26,529: INFO: Gaussianizing data...
Layer: 1, Loss: 3.0748660229269937, NLL: 6.2502
Layer: 2, Loss: 2.227713450305547, NLL: 2.4054
Layer: 3, Loss: 0.34255960649344885, NLL: 2.8408
Layer: 4, Loss: 0.05055666284729021, NLL: 2.6508
Layer: 5, Loss: 0.03001608874221695, NLL: 2.4369
Layer: 6, Loss: 0.06050847226788214, NLL: 2.2907
Layer: 7, Loss: 0, NLL: 2.1711
Layer: 8, Loss: 0, NLL: 2.0606
Layer: 9, Loss: 0.11026026420366541, NLL: 1.8956
Layer: 10, Loss: 0, NLL: 1.8185
Layer: 11, Loss: 0, NLL: 1.7268
Layer: 12, Loss: 0.086410072211903, NLL: 1.5862
Layer: 13, Loss: 0, NLL: 1.5302
Layer: 14, Loss: 0.00036237390461657526, NLL: 1.4756
Layer: 15, Loss: 0.008297986984954875, NLL: 1.4261
Layer: 16, Loss: 0.014594507496342501, NLL: 1.3855
Layer: 17, Loss: 0.009873224221399468, NLL: 1.3512
Layer: 18, Loss: 0, NLL: 1.3251
Layer: 19, Loss: 0.011758969328891311, NLL: 1.3032
Layer: 20, Loss: 0.004931759596233931, NLL: 1.2674
Layer: 21, Loss: 0.009953292543528391, NLL: 1.2509
Layer: 22, Loss: 0, NLL: 1.2377
Layer: 23, Loss: 0.018910490099894872, NLL: 1.2048
Layer: 24, Loss: 0, NLL: 1.1821
Layer: 25, Loss: 0.0013225997708872228, NLL: 1.1678
Layer: 26, Loss: 0, NLL: 1.1585
Layer: 27, Loss: 0.005072068901549542, NLL: 1.1269
Layer: 28, Loss: 0.018166758857068643, NLL: 1.1039
Layer: 29, Loss: 0, NLL: 1.0981
Layer: 30, Loss: 0.010119926795434608, NLL: 1.0836
Layer: 31, Loss: 0, NLL: 1.0888
Layer: 32, Loss: 0.0024449901763841098, NLL: 1.0800
Layer: 33, Loss: 0.0010861575293663606, NLL: 1.0686
Layer: 34, Loss: 0, NLL: 1.0614
Layer: 35, Loss: 0.0032398876949759625, NLL: 1.0488
Layer: 36, Loss: 0, NLL: 1.0496
Layer: 37, Loss: 0.0030772625337807114, NLL: 1.0326
Layer: 38, Loss: 0.0031331497381223983, NLL: 1.0372
Layer: 39, Loss: 0.0018083616602861952, NLL: 1.0302
Layer: 40, Loss: 0, NLL: 1.0355
Layer: 41, Loss: 0.015321963285071405, NLL: 1.0085
Layer: 42, Loss: 0, NLL: 1.0161
Layer: 43, Loss: 0.010954321779820475, NLL: 1.0018
Layer: 44, Loss: 0, NLL: 1.0091
Layer: 45, Loss: 0, NLL: 1.0071
Layer: 46, Loss: 0, NLL: 0.9989
Layer: 47, Loss: 0, NLL: 1.0029
Layer: 48, Loss: 0.004479260995511325, NLL: 0.9975
Layer: 49, Loss: 0, NLL: 1.0030
Layer: 50, Loss: 0.0008159168144477036, NLL: 1.0056
Layer: 51, Loss: 0, NLL: 1.0128
Layer: 52, Loss: 0.009578439132313932, NLL: 0.9993
Layer: 53, Loss: 0, NLL: 1.0073
Layer: 54, Loss: 0, NLL: 1.0072
Layer: 55, Loss: 0, NLL: 1.0172
Layer: 56, Loss: 0, NLL: 1.0079
Layer: 57, Loss: 0.0011573718171309366, NLL: 1.0151
Layer: 58, Loss: 0, NLL: 1.0156
Layer: 59, Loss: 0, NLL: 1.0160
Layer: 60, Loss: 0.00018886191949718523, NLL: 1.0182
Layer: 61, Loss: 0, NLL: 1.0233
Layer: 62, Loss: 0.0018351984701592272, NLL: 1.0312
Layer: 63, Loss: 0, NLL: 1.0298
Layer: 64, Loss: 0.006267365824321303, NLL: 1.0338
Layer: 65, Loss: 0, NLL: 1.0435
Layer: 66, Loss: 2.6173399254147967e-05, NLL: 1.0474
Layer: 67, Loss: 0, NLL: 1.0528
Layer: 68, Loss: 0, NLL: 1.0527
Layer: 69, Loss: 0, NLL: 1.0663
Layer: 70, Loss: 0, NLL: 1.0701
Layer: 71, Loss: 0, NLL: 1.0726
Layer: 72, Loss: 0, NLL: 1.0787
Layer: 73, Loss: 0, NLL: 1.0868
Layer: 74, Loss: 0, NLL: 1.1000
Layer: 75, Loss: 0.0032547598840295677, NLL: 1.1012
Layer: 76, Loss: 0, NLL: 1.1079
Layer: 77, Loss: 0, NLL: 1.1123
Layer: 78, Loss: 0, NLL: 1.1269
Layer: 79, Loss: 0, NLL: 1.1188
Layer: 80, Loss: 0, NLL: 1.1340
Layer: 81, Loss: 0, NLL: 1.1370
Layer: 82, Loss: 0, NLL: 1.1494
Layer: 83, Loss: 0.006870424028228683, NLL: 1.1476
Layer: 84, Loss: 0, NLL: 1.1547
Layer: 85, Loss: 0, NLL: 1.1685
Layer: 86, Loss: 0, NLL: 1.1648
Layer: 87, Loss: 0, NLL: 1.1797
Layer: 88, Loss: 0, NLL: 1.1924
Layer: 89, Loss: 0, NLL: 1.1963
Layer: 90, Loss: 0, NLL: 1.2083
Layer: 91, Loss: 0, NLL: 1.2244
Layer: 92, Loss: 0, NLL: 1.2225
Layer: 93, Loss: 0, NLL: 1.2325
Layer: 94, Loss: 0, NLL: 1.2398
Layer: 95, Loss: 0, NLL: 1.2481
Layer: 96, Loss: 0, NLL: 1.2528
Layer: 97, Loss: 0, NLL: 1.2581
Layer: 98, Loss: 0, NLL: 1.2715
Layer: 99, Loss: 0, NLL: 1.2786
Layer: 100, Loss: 0.0030371966905526904, NLL: 1.2859
Layer: 101, Loss: 0, NLL: 1.2926
Layer: 102, Loss: 0, NLL: 1.3045
Layer: 103, Loss: 0, NLL: 1.3133
Layer: 104, Loss: 0, NLL: 1.3134
Layer: 105, Loss: 0, NLL: 1.3266
Layer: 106, Loss: 0, NLL: 1.3341
Layer: 107, Loss: 0, NLL: 1.3453
Layer: 108, Loss: 0, NLL: 1.3508
Layer: 109, Loss: 0, NLL: 1.3609
Layer: 110, Loss: 0, NLL: 1.3650
Layer: 111, Loss: 0, NLL: 1.3761
Layer: 112, Loss: 0, NLL: 1.3841
Layer: 113, Loss: 0, NLL: 1.3972
Layer: 114, Loss: 0, NLL: 1.3957
Layer: 115, Loss: 0, NLL: 1.4138
Layer: 116, Loss: 0, NLL: 1.4191
Layer: 117, Loss: 0, NLL: 1.4260
Layer: 118, Loss: 0, NLL: 1.4405
Layer: 119, Loss: 0, NLL: 1.4437
Layer: 120, Loss: 0.0030371966905526904, NLL: 1.4580
2020-05-12 23:25:47,349: INFO: Time Taken: 80.82 secs
from src.experiments.utils import run_parallel_step
from sklearn.utils import gen_batches
from joblib import delayed, Parallel
import joblib
slices = list(gen_batches(density_cube_df_norm.values.shape[0], 1000))
density_cube_df_norm.values[slices[0]].shape, len(slices)
((1000, 6), 2606)
from dask.distributed import Client
client = Client(processes=False)
client

Client

Cluster

  • Workers: 1
  • Cores: 28
  • Memory: 270.18 GB
import dask.dataframe as dd
df_dask = dd.from_pandas(density_cube_df_norm.reset_index(), chunksize=2_000)
df_dask
Dask DataFrame Structure:
time lat lon var_x0 var_x1 var_x2 var_x3 var_x4 var_x5
npartitions=1303
0 datetime64[ns] float64 float64 float32 float32 float32 float32 float32 float32
2000 ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ...
2604000 ... ... ... ... ... ... ... ... ...
2605211 ... ... ... ... ... ... ... ... ...
Dask Name: from_pandas, 1303 tasks

df_dask[['var_x0', 'var_x1', 'var_x2', 'var_x3', 'var_x4', 'var_x5']].values
Array Chunk
Bytes unknown unknown
Shape (nan, 6) (nan, 6)
Count 3909 Tasks 1303 Chunks
Type float32 numpy.ndarray
# from dask_ml.wrappers import ParallelPostFit
from sklearn.utils import gen_batches
import joblib
from joblib import Parallel, delayed
slices = list(gen_batches(density_cube_df_norm.values.shape[0], 1000))
clf = ParallelPostFit(rbig_model)
import joblib
with joblib.parallel_backend('dask'):
    jobs = (delayed(rbig_model.score_samples)(
        density_cube_df_norm.values[islice]) for islice in slices
               )

    parallel = Parallel(verbose=1)

    results = parallel(jobs)
[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 28 concurrent workers.
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
2020-05-12 23:23:23,102: ERROR: Internal Python error in the inspect module.
Below is the traceback from this internal error.

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 223, in maybe_to_futures
    f = call_data_futures[arg]
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 56, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 140172999264976

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-14-df71624544ba>", line 8, in <module>
    results = parallel(jobs)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/parallel.py", line 1007, in __call__
    while self.dispatch_one_batch(iterator):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/parallel.py", line 835, in dispatch_one_batch
    self._dispatch(tasks)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/parallel.py", line 754, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 254, in apply_async
    func, args = self._to_func_args(func)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 243, in _to_func_args
    args = list(maybe_to_futures(args))
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 231, in maybe_to_futures
    [f] = self.client.scatter([arg])
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/distributed/client.py", line 2164, in scatter
    return self.sync(
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/distributed/client.py", line 815, in sync
    return sync(
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/distributed/utils.py", line 344, in sync
    e.wait(10)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/threading.py", line 558, in wait
    signaled = self._cond.wait(timeout)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/threading.py", line 306, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 2044, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py", line 1148, in get_records
    return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py", line 316, in wrapped
    return f(*args, **kwargs)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py", line 350, in _fixed_getinnerframes
    records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 1503, in getinnerframes
    frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 1461, in getframeinfo
    filename = getsourcefile(frame) or getfile(frame)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 708, in getsourcefile
    if getattr(getmodule(object, filename), '__loader__', None) is not None:
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 751, in getmodule
    f = getabsfile(module)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 720, in getabsfile
    _filename = getsourcefile(object) or getfile(object)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 705, in getsourcefile
    if os.path.exists(filename):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/genericpath.py", line 19, in exists
    os.stat(path)
KeyboardInterrupt
2020-05-12 23:23:23,247: INFO: 
Unfortunately, your original traceback can not be constructed.

/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py:1702: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
2020-05-12 23:23:24,686: ERROR: Internal Python error in the inspect module.
Below is the traceback from this internal error.

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 223, in maybe_to_futures
    f = call_data_futures[arg]
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 56, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 140172999264976

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-14-df71624544ba>", line 8, in <module>
    results = parallel(jobs)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/parallel.py", line 1007, in __call__
    while self.dispatch_one_batch(iterator):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/parallel.py", line 835, in dispatch_one_batch
    self._dispatch(tasks)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/parallel.py", line 754, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 254, in apply_async
    func, args = self._to_func_args(func)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 243, in _to_func_args
    args = list(maybe_to_futures(args))
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py", line 231, in maybe_to_futures
    [f] = self.client.scatter([arg])
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/distributed/client.py", line 2164, in scatter
    return self.sync(
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/distributed/client.py", line 815, in sync
    return sync(
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/distributed/utils.py", line 344, in sync
    e.wait(10)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/threading.py", line 558, in wait
    signaled = self._cond.wait(timeout)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/threading.py", line 306, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 2044, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3254, in run_ast_nodes
    if (await self.run_code(code, result,  async_=asy)):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3348, in run_code
    self.showtraceback(running_compiled_code=True)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 2046, in showtraceback
    stb = self.InteractiveTB.structured_traceback(etype,
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py", line 1414, in structured_traceback
    return FormattedTB.structured_traceback(
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py", line 1314, in structured_traceback
    return VerboseTB.structured_traceback(
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py", line 1183, in structured_traceback
    formatted_exceptions += self.prepare_chained_exception_message(evalue.__cause__)
TypeError: can only concatenate str (not "list") to str

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 2044, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'TypeError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py", line 1148, in get_records
    return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py", line 316, in wrapped
    return f(*args, **kwargs)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py", line 350, in _fixed_getinnerframes
    records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 1503, in getinnerframes
    frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 1461, in getframeinfo
    filename = getsourcefile(frame) or getfile(frame)
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 708, in getsourcefile
    if getattr(getmodule(object, filename), '__loader__', None) is not None:
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/inspect.py", line 754, in getmodule
    os.path.realpath(f)] = module.__name__
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/posixpath.py", line 391, in realpath
    path, ok = _joinrealpath(filename[:0], filename, {})
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/posixpath.py", line 425, in _joinrealpath
    if not islink(newpath):
  File "/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/posixpath.py", line 167, in islink
    st = os.lstat(path)
KeyboardInterrupt
2020-05-12 23:23:24,741: INFO: 
Unfortunately, your original traceback can not be constructed.

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py in maybe_to_futures(args)
    222                     try:
--> 223                         f = call_data_futures[arg]
    224                     except KeyError:

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_dask.py in __getitem__(self, obj)
     55     def __getitem__(self, obj):
---> 56         ref, val = self._data[id(obj)]
     57         if ref() is not obj:

KeyError: 140172999264976

During handling of the above exception, another exception occurred:


During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py in showtraceback(self, exc_tuple, filename, tb_offset, exception_only, running_compiled_code)
   2043                         # in the engines. This should return a list of strings.
-> 2044                         stb = value._render_traceback_()
   2045                     except Exception:

AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py in run_code(self, code_obj, result, async_)
   3346             if result is not None:
   3347                 result.error_in_exec = sys.exc_info()[1]
-> 3348             self.showtraceback(running_compiled_code=True)
   3349         else:
   3350             outflag = False

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/interactiveshell.py in showtraceback(self, exc_tuple, filename, tb_offset, exception_only, running_compiled_code)
   2044                         stb = value._render_traceback_()
   2045                     except Exception:
-> 2046                         stb = self.InteractiveTB.structured_traceback(etype,
   2047                                             value, tb, tb_offset=tb_offset)
   2048 

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py in structured_traceback(self, etype, value, tb, tb_offset, number_of_lines_of_context)
   1412         else:
   1413             self.tb = tb
-> 1414         return FormattedTB.structured_traceback(
   1415             self, etype, value, tb, tb_offset, number_of_lines_of_context)
   1416 

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py in structured_traceback(self, etype, value, tb, tb_offset, number_of_lines_of_context)
   1312         if mode in self.verbose_modes:
   1313             # Verbose modes need a full traceback
-> 1314             return VerboseTB.structured_traceback(
   1315                 self, etype, value, tb, tb_offset, number_of_lines_of_context
   1316             )

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/IPython/core/ultratb.py in structured_traceback(self, etype, evalue, etb, tb_offset, number_of_lines_of_context)
   1181         exception = self.get_parts_of_chained_exception(evalue)
   1182         if exception:
-> 1183             formatted_exceptions += self.prepare_chained_exception_message(evalue.__cause__)
   1184             etype, evalue, etb = exception
   1185         else:

TypeError: can only concatenate str (not "list") to str
from dask.distributed import Client

client = Client("tcp://127.0.0.1:58499")
client
/home/emmanuel/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/distributed/client.py:1115: VersionMismatchWarning: Mismatched versions found

dask
+-----------------------+---------+
|                       | version |
+-----------------------+---------+
| client                | 2.16.0  |
| scheduler             | 2.15.0  |
| tcp://127.0.0.1:32791 | 2.15.0  |
| tcp://127.0.0.1:36857 | 2.15.0  |
| tcp://127.0.0.1:37394 | 2.15.0  |
| tcp://127.0.0.1:37631 | 2.15.0  |
| tcp://127.0.0.1:55733 | 2.15.0  |
| tcp://127.0.0.1:56943 | 2.15.0  |
| tcp://127.0.0.1:58234 | 2.15.0  |
+-----------------------+---------+

distributed
+-----------------------+---------+
|                       | version |
+-----------------------+---------+
| client                | 2.16.0  |
| scheduler             | 2.15.1  |
| tcp://127.0.0.1:32791 | 2.15.1  |
| tcp://127.0.0.1:36857 | 2.15.1  |
| tcp://127.0.0.1:37394 | 2.15.1  |
| tcp://127.0.0.1:37631 | 2.15.1  |
| tcp://127.0.0.1:55733 | 2.15.1  |
| tcp://127.0.0.1:56943 | 2.15.1  |
| tcp://127.0.0.1:58234 | 2.15.1  |
+-----------------------+---------+

python
+-----------------------+---------------+
|                       | version       |
+-----------------------+---------------+
| client                | 3.8.2.final.0 |
| scheduler             | 3.7.7.final.0 |
| tcp://127.0.0.1:32791 | 3.7.7.final.0 |
| tcp://127.0.0.1:36857 | 3.7.7.final.0 |
| tcp://127.0.0.1:37394 | 3.7.7.final.0 |
| tcp://127.0.0.1:37631 | 3.7.7.final.0 |
| tcp://127.0.0.1:55733 | 3.7.7.final.0 |
| tcp://127.0.0.1:56943 | 3.7.7.final.0 |
| tcp://127.0.0.1:58234 | 3.7.7.final.0 |
+-----------------------+---------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

Client

Cluster

  • Workers: 7
  • Cores: 28
  • Memory: 270.18 GB

jobs = (delayed(rbig_model.score_samples)(
    density_cube_df_norm.values[islice]) for islice in slices
           )

parallel = Parallel(n_jobs=8,verbose=2)

results = parallel(jobs)
[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  25 tasks      | elapsed:   11.8s
[Parallel(n_jobs=8)]: Done 146 tasks      | elapsed:   42.1s
[Parallel(n_jobs=8)]: Done 349 tasks      | elapsed:  1.7min
[Parallel(n_jobs=8)]: Done 632 tasks      | elapsed:  2.9min
[Parallel(n_jobs=8)]: Done 997 tasks      | elapsed:  4.4min
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-119-8d1e1d84f5fd> in <module>
      5 parallel = Parallel(n_jobs=8,verbose=2)
      6 
----> 7 results = parallel(jobs)

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/parallel.py in __call__(self, iterable)
   1015 
   1016             with self._backend.retrieval_context():
-> 1017                 self.retrieve()
   1018             # Make sure that we get a last message telling us we are done
   1019             elapsed_time = time.time() - self._start_time

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/parallel.py in retrieve(self)
    907             try:
    908                 if getattr(self._backend, 'supports_timeout', False):
--> 909                     self._output.extend(job.get(timeout=self.timeout))
    910                 else:
    911                     self._output.extend(job.get())

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    560         AsyncResults.get from multiprocessing."""
    561         try:
--> 562             return future.result(timeout=timeout)
    563         except LokyTimeoutError:
    564             raise TimeoutError()

~/.conda/envs/rbig_eo_dev/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    432                 return self.__get_result()
    433 
--> 434             self._condition.wait(timeout)
    435 
    436             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

~/.conda/envs/rbig_eo_dev/lib/python3.8/threading.py in wait(self, timeout)
    300         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    301             if timeout is None:
--> 302                 waiter.acquire()
    303                 gotit = True
    304             else:

KeyboardInterrupt: 
# get the probability estimates
logging.info(f"Getting probability estimates...")
t0 = time.time()
# subsamples = 500_000
# subset = subset_indices(density_cube_df_norm.values, subsamples)

run_parallel_step(rbig_model.score_samples)
X_prob = rbig_model.score_samples(density_cube_df_norm.values)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")
X_prob = pd.DataFrame(
    data=X_prob.squeeze(), 
    index=density_cube_df_norm.index,
    columns=['log_probability']
)
# nll = rbig_model.score(subset.values)
# logging.info(f"Log Likelihood: {nll:.4f}")
2020-05-12 22:39:32,594: INFO: Getting probability estimates...
/home/emmanuel/code/rbig/rbig/density/kde.py:159: RuntimeWarning: divide by zero encountered in log
  return np.log(self.pdf(X))
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-86-3230df975467> in <module>
      4 # subsamples = 500_000
      5 # subset = subset_indices(density_cube_df_norm.values, subsamples)
----> 6 X_prob = rbig_model.score_samples(density_cube_df_norm.values)
      7 t1 = time.time() - t0
      8 logging.info(f"Time Taken: {t1:.2f} secs")

~/code/rbig/rbig/models/flows.py in score_samples(self, X, y)
    176         # transform
    177 
--> 178         Z, dX = self.transform(X)
    179         prior_logprob = stats.norm().logpdf(Z)
    180 

~/code/rbig/rbig/models/flows.py in transform(self, X, y, return_jacobian)
    155         for iflow in self.flows_:
    156 
--> 157             X, dX = iflow.transform(X, None, return_jacobian=True)
    158 
    159             X_slogdet += dX

~/code/rbig/rbig/layers/rbig_layer.py in transform(self, X, y, return_jacobian)
     57         if self.is_fitted is True:
     58             logging.debug(f"Already fitted. Transforning!")
---> 59             Xmg = self.mg_transform.transform(X)
     60             Xtrans = self.rot_transform.transform(Xmg)
     61         else:

~/code/rbig/rbig/transform/marginal.py in transform(self, X)
     83             logging.debug(f"Transforming Feature")
     84             X[:, feature_idx] = (
---> 85                 self.transforms_[feature_idx].transform(X[:, feature_idx]).squeeze()
     86             )
     87 

~/code/rbig/rbig/transform/gaussianization.py in transform(self, X)
     71 
     72         # transform data to gaussian domain
---> 73         X = InverseGaussCDF().transform(X)
     74 
     75         # return gaussianized variable

~/code/rbig/rbig/transform/gauss_icdf.py in transform(self, X)
     35         X = make_interior(X, bounds=(0.0, 1.0),)
     36 
---> 37         return self._transform(X, stats.norm.ppf, inverse=False)
     38 
     39     def inverse_transform(self, X: np.ndarray):

~/code/rbig/rbig/transform/gauss_icdf.py in _transform(self, X, f, inverse)
     61 
     62         # perform transformation
---> 63         X = f(X)
     64 
     65         if inverse == False:

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_distn_infrastructure.py in ppf(self, q, *args, **kwds)
   2009             goodargs = argsreduce(cond, *((q,)+args+(scale, loc)))
   2010             scale, loc, goodargs = goodargs[-2], goodargs[-1], goodargs[:-2]
-> 2011             place(output, cond, self._ppf(*goodargs) * scale + loc)
   2012         if output.ndim == 0:
   2013             return output[()]

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_continuous_distns.py in _ppf(self, q)
    255 
    256     def _ppf(self, q):
--> 257         return _norm_ppf(q)
    258 
    259     def _isf(self, q):

~/.conda/envs/rbig_eo_dev/lib/python3.8/site-packages/scipy/stats/_continuous_distns.py in _norm_ppf(q)
    194 
    195 def _norm_ppf(q):
--> 196     return sc.ndtri(q)
    197 
    198 

KeyboardInterrupt: 
def get_information(df):

    df = df.reset_index().set_index(['time', 'lat', 'lon'])

#     # convert to datetime
#     df.index['time'] = pd.to_datetime(df.index['time'])


    # remove probabilities greater than 1
#     df['probability'][df['probability'] >= 1.0] = np.nan
    df['probability'] = np.exp(df['log_probability'])

    # create xarray cubes
    probs_cubes = xr.Dataset.from_dataframe(df)

    # resample monthly
    probs_cubes = probs_cubes.resample(time='1MS').mean()

    # shannon info
    probs_cubes['shannon_info'] = -np.log(probs_cubes['probability']) * np.log(2)
    return probs_cubes
probs_cubes = get_information(X_prob)
fig, ax = plt.subplots()
probs_cubes.probability.mean(dim='time').plot(
    ax=ax,
    vmin=0, robust=True, cmap='Reds', 
    cbar_kwargs={'label': "Probability"}
)

fig, ax = plt.subplots()
probs_cubes.log_probability.mean(dim='time').plot(
    ax=ax, robust=True, cmap='Reds', 
    cbar_kwargs={'label': "Log Probability"}
)

fig, ax = plt.subplots()
probs_cubes.shannon_info.mean(dim='time').plot(
    ax=ax,
    vmin=0, robust=True, cmap='Reds', 
    cbar_kwargs={'label': "Shannon Information",}
)
fig, ax = plt.subplots()
probs_cubes.probability.mean(dim='time').plot(
    ax=ax,
    vmin=0, robust=True, cmap='Reds', 
    cbar_kwargs={'label': "Probability"}
)

fig, ax = plt.subplots()
probs_cubes.log_probability.mean(dim='time').plot(
    ax=ax, robust=True, cmap='Reds', 
    cbar_kwargs={'label': "Log Probability"}
)

fig, ax = plt.subplots()
probs_cubes.shannon_info.mean(dim='time').plot(
    ax=ax,
    vmin=0, robust=True, cmap='Reds', 
    cbar_kwargs={'label': "Shannon Information",}
)
<matplotlib.collections.QuadMesh at 0x7f1836ab3970>
rbig_model.total_correlation()
75.06944987460885
fig, ax = plt.subplots()
probs_cubes.probability.mean(dim='time').plot(
    ax=ax,
    vmin=0, robust=True, cmap='Reds', 
    cbar_kwargs={'label': "Probability"}
)

fig, ax = plt.subplots()
probs_cubes.shannon_info.mean(dim='time').plot(
    ax=ax,
    vmin=0, robust=True, cmap='Reds', 
    cbar_kwargs={'label': "Shannon Information"}
)
<matplotlib.collections.QuadMesh at 0x7f18041d6e80>
!
import rbig
rbig.
# =========================
# Model - Gaussianization
# =========================

# Gaussianize the data
logging.info(f"Gaussianizing data...")
t0 = time.time()
rbig_model = get_rbig_model(X=density_cube_df_norm.values)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")

# get the probability estimates
logging.info(f"Getting probability estimates...")
t0 = time.time()
X_prob = rbig_model.predict_proba(
    density_cube_df_norm.values, n_trials=20, chunksize=10_000, domain='input'
)
t1 = time.time() - t0
logging.info(f"Time Taken: {t1:.2f} secs")

X_prob = pd.DataFrame(
    data=X_prob, 
    index=density_cube_df_norm.index,
    columns=['probability']
)

variables = [
    'root_moisture',
    'soil_moisture',
    'gross_primary_productivity',
    'land_surface_temperature',
    'precipitation',
]
datacube = get_dataset(variables)
datacube
Show/Hide data repr Show/Hide attributes
xarray.Dataset
    • lat: 720
    • lon: 1440
    • time: 1702
    • lat
      (lat)
      float32
      89.875 89.625 ... -89.625 -89.875
      array([ 89.875,  89.625,  89.375, ..., -89.375, -89.625, -89.875],
            dtype=float32)
    • lon
      (lon)
      float32
      -179.875 -179.625 ... 179.875
      array([-179.875, -179.625, -179.375, ...,  179.375,  179.625,  179.875],
            dtype=float32)
    • time
      (time)
      datetime64[ns]
      1980-01-05 ... 2016-12-30
      bounds :
      time_bnds
      long_name :
      time
      standard_name :
      time
      array(['1980-01-05T00:00:00.000000000', '1980-01-13T00:00:00.000000000',
             '1980-01-21T00:00:00.000000000', ..., '2016-12-14T00:00:00.000000000',
             '2016-12-22T00:00:00.000000000', '2016-12-30T00:00:00.000000000'],
            dtype='datetime64[ns]')
    • root_moisture
      (time, lat, lon)
      float32
      dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
      ID :
      61
      esa_cci_path :
      nan
      long_name :
      Root-Zone Soil Moisture
      orig_attrs :
      {'long_name': 'Root-Zone Soil Moisture', 'project_name': 'GLEAM', 'references': 'Martens, B., Miralles, D.G., Lievens, H., van der Schalie, R., de Jeu, R.A.M., Fernández-Prieto, D., Beck, H.E., Dorigo, W.A., and Verhoest, N.E.C.: GLEAM v3: satellite-based land evaporation and root-zone soil moisture, Geoscientific Model Development, 10, 1903–1925, 2017.', 'source_name': 'SMroot', 'standard_name': 'soil_moisture_content', 'units': 'm3/m3', 'url': 'http://www.gleam.eu'}
      orig_version :
      Version 3.2
      project_name :
      GLEAM
      time_coverage_end :
      2016-12-30
      time_coverage_resolution :
      P8D
      time_coverage_start :
      2003-01-05
      url :
      http://www.gleam.eu
      Array Chunk
      Bytes 7.06 GB 4.15 MB
      Shape (1702, 720, 1440) (1, 720, 1440)
      Count 1703 Tasks 1702 Chunks
      Type float32 numpy.ndarray
      1440 720 1702
    • soil_moisture
      (time, lat, lon)
      float32
      dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
      ID :
      78
      esa_cci_path :
      /neodc/esacci/soil_moisture/data/daily_files/COMBINED/v04.2/
      long_name :
      Soil Moisture
      orig_attrs :
      {'comment': 'Soil moisture based on the SOilmoisture CCI project', 'long_name': 'Soil Moisture', 'project_name': 'SoilMoisture CCI', 'references': 'Liu, Y.Y., Parinussa, R.M., Dorigo, W.A., De Jeu, R.A.M., Wagner, W., McCabe, M.F., Evans, J.P., and van Dijk, A.I.J.M. (2012): Trend-preserving blending of passive and active microwave soil moisture retrievals; Liu, Y.Y., Parinussa, R.M., Dorigo, W.A., De Jeu, R.A.M., Wagner, W., van Dijk, A.I.J.M., McCabe, M.F., & Evans, J.P. (2011): Developing an improved soil moisture dataset by blending passive and active microwave satellite based retrievals. Hydrology and Earth System Sciences, 15, 425-436.', 'source_name': 'SoilMoisture', 'standard_name': 'soil_moisture_content', 'units': 'm3', 'url': 'http://www.esa-soilmoisture-cci.org'}
      orig_version :
      v04.2
      project_name :
      SoilMoisture CCI
      time_coverage_end :
      2014-01-29
      time_coverage_resolution :
      P8D
      time_coverage_start :
      1980-01-05
      url :
      http://www.esa-soilmoisture-cci.org
      Array Chunk
      Bytes 7.06 GB 4.15 MB
      Shape (1702, 720, 1440) (1, 720, 1440)
      Count 1703 Tasks 1702 Chunks
      Type float32 numpy.ndarray
      1440 720 1702
    • gross_primary_productivity
      (time, lat, lon)
      float32
      dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
      ID :
      47
      esa_cci_path :
      nan
      long_name :
      Gross Primary Productivity
      orig_attrs :
      {'comment': 'Gross Carbon uptake of of the ecosystem through photosynthesis', 'long_name': 'Gross Primary Productivity', 'orig_attrs': {}, 'project_name': 'FLUXCOM', 'references': 'Tramontana, Gianluca, et al. "Predicting carbon dioxide and energy fluxes across global FLUXNET sites with regression algorithms." (2016).', 'source_name': 'GPPall', 'standard_name': 'gross_primary_productivity_of_carbon', 'units': 'gC m-2 day-1', 'url': 'http://www.fluxcom.org/'}
      orig_version :
      v1
      project_name :
      FLUXCOM
      time_coverage_end :
      2015-12-31
      time_coverage_resolution :
      P8D
      time_coverage_start :
      2001-01-05
      url :
      http://www.fluxcom.org/
      Array Chunk
      Bytes 7.06 GB 4.15 MB
      Shape (1702, 720, 1440) (1, 720, 1440)
      Count 1703 Tasks 1702 Chunks
      Type float32 numpy.ndarray
      1440 720 1702
    • land_surface_temperature
      (time, lat, lon)
      float32
      dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
      ID :
      69
      esa_cci_path :
      nan
      long_name :
      Land Surface Temperature
      orig_attrs :
      {'comment': 'Advanced Along Track Scanning Radiometer pixel land surface temperature product', 'long_name': 'Land Surface Temperature', 'orig_attrs': {}, 'project_name': 'GlobTemperature', 'references': 'Jiménez, C., et al. "Inversion of AMSR‐E observations for land surface temperature estimation: 1. Methodology and evaluation with station temperature." Journal of Geophysical Research: Atmospheres 122.6 (2017): 3330-3347.', 'source_name': 'LST', 'standard_name': 'surface_temperature', 'units': 'K', 'url': 'http://data.globtemperature.info/'}
      orig_version :
      nan
      project_name :
      GlobTemperature
      time_coverage_end :
      2011-12-31
      time_coverage_resolution :
      P8D
      time_coverage_start :
      2002-05-21
      url :
      http://data.globtemperature.info/
      Array Chunk
      Bytes 7.06 GB 4.15 MB
      Shape (1702, 720, 1440) (1, 720, 1440)
      Count 1703 Tasks 1702 Chunks
      Type float32 numpy.ndarray
      1440 720 1702
    • precipitation
      (time, lat, lon)
      float32
      dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
      ID :
      71
      esa_cci_path :
      nan
      long_name :
      Precipitation
      orig_attrs :
      {'comment': 'Precipitation based on the GPCP dataset.', 'long_name': 'Precip - RealTime [RT] (see documentation for more information)', 'orig_attrs': {}, 'project_name': 'GPCP', 'references': 'Adler, Robert F., et al. "The version-2 global precipitation climatology project (GPCP) monthly precipitation analysis (1979-present)." Journal of hydrometeorology 4.6 (2003): 1147-1167.', 'source_name': 'Precip', 'standard_name': 'precipitation_flux', 'units': 'mm/day', 'url': 'http://precip.gsfc.nasa.gov/'}
      orig_version :
      nan
      project_name :
      GPCP
      time_coverage_end :
      2015-01-05
      time_coverage_resolution :
      P8D
      time_coverage_start :
      1980-01-05
      url :
      http://precip.gsfc.nasa.gov/
      Array Chunk
      Bytes 7.06 GB 4.15 MB
      Shape (1702, 720, 1440) (1, 720, 1440)
      Count 1703 Tasks 1702 Chunks
      Type float32 numpy.ndarray
      1440 720 1702
  • Metadata_conventions :
    Unidata Dataset Discovery v1.0
    acknowledgment :
    The ESDL team acknowledges all data providers!
    chunking :
    1x720x1440
    comment :
    none.
    contributor_name :
    Max Planck Institute for Biogeochemistry
    contributor_role :
    ESDL Science Lead
    creator_email :
    info@earthsystemdatalab.net
    creator_name :
    Brockmann Consult GmbH
    creator_url :
    www.earthsystemdatalab.net
    date_created :
    17.12.2018
    date_issued :
    19.12.2018
    date_modified :
    17.12.2018
    geospatial_lat_max :
    89.75
    geospatial_lat_min :
    -89.75
    geospatial_lon_max :
    179.75
    geospatial_lon_min :
    -179.75
    geospatial_resolution :
    1/4deg
    history :
    - processing with esdl cube v0.1 (https://github.com/esa-esdl/esdl-core/)
    id :
    v2.0.0
    institution :
    Brockmann Consult GmbH
    keywords :
    Earth Science, Geophysical Variables
    license :
    Please refer to individual variables
    naming_authority :
    Earth System Data Lab team
    processing_level :
    Level 4
    project :
    ESA Earth System Data Lab
    publisher_email :
    info@earthsystemdatalab.net
    publisher_name :
    Brockmann Consult GmbH & Max Planck Institute for Biogechemistry
    publisher_url :
    www.brockmann-consult.de
    standard_name_vocabulary :
    CF-1.7
    summary :
    This data set contains a data cube of Earth System variables created by the ESA project Earth System Data Lab.
    time_coverage_duration :
    P37Y
    time_coverage_end :
    30.12.2016
    time_coverage_resolution :
    P8D
    time_coverage_start :
    05.01.1980
    title :
    Earth System Data Cube
mean_cube = datacube.mean(dim=['lat', 'lon']).sel(time=slice('2002', '2010'))
mean_cube = (mean_cube - mean_cube.mean()) / mean_cube.std()
# mean_cube = datacube.mean(dim=['lat', 'lon'])
# mean_cube = (mean_cube - mean_cube.mean) / mean_cube.std()
fig, ax = plt.subplots()
for ivariable in variables:
    mean_cube[ivariable].plot.line(ax=ax, label=ivariable)

ax.legend()
plt.tight_layout()
plt.show()

Experimental Step

datacube = get_dataset([params[0]['variable']])
datacube
Show/Hide data repr Show/Hide attributes
xarray.Dataset
    • lat: 720
    • lon: 1440
    • time: 1702
    • lat
      (lat)
      float32
      89.875 89.625 ... -89.625 -89.875
      array([ 89.875,  89.625,  89.375, ..., -89.375, -89.625, -89.875],
            dtype=float32)
    • lon
      (lon)
      float32
      -179.875 -179.625 ... 179.875
      array([-179.875, -179.625, -179.375, ...,  179.375,  179.625,  179.875],
            dtype=float32)
    • time
      (time)
      datetime64[ns]
      1980-01-05 ... 2016-12-30
      bounds :
      time_bnds
      long_name :
      time
      standard_name :
      time
      array(['1980-01-05T00:00:00.000000000', '1980-01-13T00:00:00.000000000',
             '1980-01-21T00:00:00.000000000', ..., '2016-12-14T00:00:00.000000000',
             '2016-12-22T00:00:00.000000000', '2016-12-30T00:00:00.000000000'],
            dtype='datetime64[ns]')
    • root_moisture
      (time, lat, lon)
      float32
      dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
      ID :
      61
      esa_cci_path :
      nan
      long_name :
      Root-Zone Soil Moisture
      orig_attrs :
      {'long_name': 'Root-Zone Soil Moisture', 'project_name': 'GLEAM', 'references': 'Martens, B., Miralles, D.G., Lievens, H., van der Schalie, R., de Jeu, R.A.M., Fernández-Prieto, D., Beck, H.E., Dorigo, W.A., and Verhoest, N.E.C.: GLEAM v3: satellite-based land evaporation and root-zone soil moisture, Geoscientific Model Development, 10, 1903–1925, 2017.', 'source_name': 'SMroot', 'standard_name': 'soil_moisture_content', 'units': 'm3/m3', 'url': 'http://www.gleam.eu'}
      orig_version :
      Version 3.2
      project_name :
      GLEAM
      time_coverage_end :
      2016-12-30
      time_coverage_resolution :
      P8D
      time_coverage_start :
      2003-01-05
      url :
      http://www.gleam.eu
      Array Chunk
      Bytes 7.06 GB 4.15 MB
      Shape (1702, 720, 1440) (1, 720, 1440)
      Count 1703 Tasks 1702 Chunks
      Type float32 numpy.ndarray
      1440 720 1702
  • Metadata_conventions :
    Unidata Dataset Discovery v1.0
    acknowledgment :
    The ESDL team acknowledges all data providers!
    chunking :
    1x720x1440
    comment :
    none.
    contributor_name :
    Max Planck Institute for Biogeochemistry
    contributor_role :
    ESDL Science Lead
    creator_email :
    info@earthsystemdatalab.net
    creator_name :
    Brockmann Consult GmbH
    creator_url :
    www.earthsystemdatalab.net
    date_created :
    17.12.2018
    date_issued :
    19.12.2018
    date_modified :
    17.12.2018
    geospatial_lat_max :
    89.75
    geospatial_lat_min :
    -89.75
    geospatial_lon_max :
    179.75
    geospatial_lon_min :
    -179.75
    geospatial_resolution :
    1/4deg
    history :
    - processing with esdl cube v0.1 (https://github.com/esa-esdl/esdl-core/)
    id :
    v2.0.0
    institution :
    Brockmann Consult GmbH
    keywords :
    Earth Science, Geophysical Variables
    license :
    Please refer to individual variables
    naming_authority :
    Earth System Data Lab team
    processing_level :
    Level 4
    project :
    ESA Earth System Data Lab
    publisher_email :
    info@earthsystemdatalab.net
    publisher_name :
    Brockmann Consult GmbH & Max Planck Institute for Biogechemistry
    publisher_url :
    www.brockmann-consult.de
    standard_name_vocabulary :
    CF-1.7
    summary :
    This data set contains a data cube of Earth System variables created by the ESA project Earth System Data Lab.
    time_coverage_duration :
    P37Y
    time_coverage_end :
    30.12.2016
    time_coverage_resolution :
    P8D
    time_coverage_start :
    05.01.1980
    title :
    Earth System Data Cube
def step(params: Dict, smoke_test: bool=False):
    # ======================
    # experiment - Data
    # ======================
    # Get DataCube
    logging.info(f"Loading '{params['variable']}' variable")
    datacube = get_dataset([params['variable']])

#     t = clip_dataset_by_geometry(datacube, get_spain())

#     datacube = select_region(
#         xr_data=datacube, bbox=get_spain()
#     )[params['variable']]
#     return t

    # subset datacube (spatially)
    logging.info(f"Selecting region '{params['region'].name}'")
    datacube = select_region(
        xr_data=datacube, bbox=params['region']
    )

    # subset datacube (temporally)
    logging.info(f"Selecting temporal period: '{params['period'].name}'")
    datacube = select_period(xr_data=datacube, period=params['period'])

#     # get datacubes
#     reference_cube_df = get_reference_cube(data=datacube)

    # get density cubes
    logging.info(f"Getting density cubes: S: {params['spatial']}, T: {params['temporal']}")
    density_cube_df = get_density_cubes(
        data=datacube[params['variable']], 
        spatial=params['spatial'], 
        temporal=params['temporal']
    )
    logging.info(f"Total data: {density_cube_df.shape}")
#     print(density_cube_df.head())

#     # get reference dataframe
#     X, Y = get_common_indices(
#         reference_df=reference_cube_df, 
#         density_df=density_cube_df
#     )
#     print(density_cube_df.values.shape)
#     print(density_cube_df.columns)
    # standardize data
    logging.info(f"Standardizing data...")
    x_transformer = StandardScaler().fit(density_cube_df.values)

    density_cube_df_norm = pd.DataFrame(
        data=x_transformer.transform(density_cube_df.values),
        columns=density_cube_df.columns.values,
        index=density_cube_df.index
    )

    # =========================
    # Model - Gaussianization
    # =========================

    # Gaussianize the data
    logging.info(f"Gaussianizing data...")
    t0 = time.time()
    rbig_model = get_rbig_model(X=density_cube_df_norm.values)
    t1 = time.time() - t0
    logging.info(f"Time Taken: {t1:.2f} secs")

    # get the probability estimates
    logging.info(f"Getting probability estimates...")
    t0 = time.time()
    X_prob = rbig_model.predict_proba(
        density_cube_df_norm.values, n_trials=20, chunksize=10_000, domain='input'
    )
    t1 = time.time() - t0
    logging.info(f"Time Taken: {t1:.2f} secs")

    X_prob = pd.DataFrame(
        data=X_prob, 
        index=density_cube_df_norm.index,
    )

    return density_cube_df, rbig_model, x_transformer, X_prob
density_cube_df, rbig_model, X_transformer, X_prob = step(params[0], smoke_test=True)
2020-05-10 12:25:34,967:INFO:Loading 'root_moisture' variable
2020-05-10 12:25:35,665:INFO:Selecting region 'spain'
2020-05-10 12:25:35,684:INFO:Selecting temporal period: 'TimePeriod(name='201001_201012', start='Jan-2002', end='Dec-2010')'
2020-05-10 12:25:35,689:INFO:Getting density cubes: S: 7, T: 1
2020-05-10 12:25:36,684:INFO:Total data: (165232, 49)
2020-05-10 12:25:36,685:INFO:Standardizing data...
2020-05-10 12:25:36,845:INFO:Gaussianizing data...
2020-05-10 12:42:13,761:INFO:Time Taken: 996.91 secs
2020-05-10 12:42:13,762:INFO:Getting probability estimates...
density_cube_df.head()
var_x0 prob
time lat lon
2010-01-05 43.625 -8.375 0.159887 0.263087
-8.125 0.159701 0.260425
-7.875 0.446944 0.147057
-7.625 0.486189 0.095024
-7.375 0.158740 0.245753
plt.hist(density_cube_df['var_x0'].squeeze(), bins=50, density=True);
plt.hist(density_cube_df['prob'].squeeze(), bins=50, range=(0, 1.0));
X.head()
var_x0
time lat lon
2010-01-05 43.625 -8.375 -1.336740
-8.125 -1.338682
-7.875 1.657619
-7.625 2.066988
-7.375 -1.348702

Visualization

prob_cube = xr.from_dataframe(X_prob)
datacube.mean(dim='time').plot(vmin=0)
plt.tight_layout()

Extra Features

So in the previous example, I took data as is. Meaning they were just samples and I used a density estimator. For this example, I will add some extra features and then we can compare how the probability estimates change.

parameters = {}
parameters['variable'] = 'root_moisture'
#     'root_moisture',
#     'soil_moisture',
#     'gross_primary_productivity',
#     'land_surface_temperature',
#     'precipitation',
# ]
parameters['region'] = get_spain()
parameters['period'] = TimePeriod(name="200201_201012", start="Jan-2002", end="Dec-2010")
parameters['spatial'] = 3
parameters['temporal'] = 2
density_cube_df, rbig_model, X_transformer, X_prob = step(parameters, smoke_test=True)

X.head()
var_x0 prob
time lat lon
2010-01-05 43.625 -8.375 -1.336740 0.263087
-8.125 -1.338682 0.260425
-7.875 1.657619 0.147057
-7.625 2.066988 0.095024
-7.375 -1.348702 0.245753
X_prob.shape, X_prob.max(), X_prob.min()
((42044,), 14.59578649691485, 0.0)
plt.hist(X_prob[X_prob < 1.], bins=100)
(array([  36.,  170.,   11.,   27.,   30.,   81.,  169.,  283.,  312.,
         256.,  284.,  465.,  283.,  164.,   63.,   43.,   40.,   43.,
          43.,   25.,   11.,   19.,   19.,   55.,   26.,   25.,  213.,
         142.,  387.,  306.,  607.,  278.,  214.,  117.,  106.,  131.,
         357.,  379.,  102.,   75.,   42.,  100.,   41.,   44.,   49.,
          36.,   59.,   32.,   44.,  366.,  137.,   71.,  126.,  163.,
         158.,  208.,  281.,  671.,  441.,  874.,  821.,  457.,  468.,
        1263., 1046., 1161., 1676., 2260., 2338., 1609., 1064., 1477.,
        2137., 1832., 1471.,  514.,  627.,  747.,  321.,  541.,  431.,
         323.,  445.,  439.,  471.,  589.,  740.,  532.,  603.,  466.,
         639.,  398.,  319.,  244.,  301.,  270.,  146.,   73.,   96.,
         347.]),
 array([0.   , 0.004, 0.009, 0.013, 0.018, 0.022, 0.027, 0.031, 0.035,
        0.04 , 0.044, 0.049, 0.053, 0.057, 0.062, 0.066, 0.071, 0.075,
        0.08 , 0.084, 0.088, 0.093, 0.097, 0.102, 0.106, 0.11 , 0.115,
        0.119, 0.124, 0.128, 0.133, 0.137, 0.141, 0.146, 0.15 , 0.155,
        0.159, 0.163, 0.168, 0.172, 0.177, 0.181, 0.186, 0.19 , 0.194,
        0.199, 0.203, 0.208, 0.212, 0.216, 0.221, 0.225, 0.23 , 0.234,
        0.239, 0.243, 0.247, 0.252, 0.256, 0.261, 0.265, 0.269, 0.274,
        0.278, 0.283, 0.287, 0.292, 0.296, 0.3  , 0.305, 0.309, 0.314,
        0.318, 0.322, 0.327, 0.331, 0.336, 0.34 , 0.345, 0.349, 0.353,
        0.358, 0.362, 0.367, 0.371, 0.375, 0.38 , 0.384, 0.389, 0.393,
        0.398, 0.402, 0.406, 0.411, 0.415, 0.42 , 0.424, 0.428, 0.433,
        0.437, 0.442]),
 <a list of 100 Patch objects>)
res
{'rv_coeff': 0.9697245,
 'rv_x_norm': 26692.102,
 'rv_y_norm': 77907.13,
 'rv_xy_norm': 2016547100.0,
 'rbig_H_x': 1.855240533094599,
 'rbig_H_y': 1.0902273375895914,
 'rbig_I_xy': 5.405821100129361,
 'rbig_vi_coeff': 3.801045104354525}
from prefect import task, Flow, Parameter

@task # get Dataset
def get_dataset(variable: str)-> xr.Dataset:
    return xr.open_zarr(str(filename))[[variable]]

@task # subset datacube
def cube_spatial_subset(xr_data: xr.Dataset, bbox: Region) -> xr.Dataset:
    """Function to spatially subset an xarray dataset from a bounding box."""
    # get bounding box
    bbox = shapely.geometry.box(
        bbox.lonmin,
        bbox.latmin,
        bbox.lonmax,
        bbox.latmax
    )
    # subset datacube
    return clip_dataset_by_geometry(xr_data, bbox)

@task 
def cube_temporal_subset(xr_data: xr.DataArray, period: Tuple[str, str]) -> xr.DataArray:
    """Function to temporally subset an xarray dataset from a tuple of
    start date and end date
    """
    return xr_data.sel(time=slice(period.start, period.end))

@task # get reference cube
def get_reference_cube(data: xr.DataArray) -> pd.DataFrame:
    """Wrapper Function to get reference cube"""
    return data.to_dataframe().dropna().reorder_levels(levels)

@task # get density cubes
def get_density_cubes(data: xr.DataArray, spatial: int, temporal: int) -> pd.DataFrame:
    """Wrapper Function to get density cubes from a dataarray"""
    return DensityCubes(
        spatial_window=spatial,
        time_window=temporal
    ).get_minicubes(data).reorder_levels(levels)

@task # get common indices
def get_common_indices(
    reference_df: pd.DataFrame, density_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    idx = density_df.index.intersection(reference_df.index)
    return reference_df.loc[idx,:], density_df.loc[idx, :]

@task # standardize the data before
def standardizer_data(X: pd.DataFrame, Y: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:

    # standardizer
    normalizer = StandardScaler(with_mean=True, with_std=True)

    # standardize X values
    X_values = normalizer.fit_transform(X.values)
    X = pd.DataFrame(data=X_values, index=X.index, columns=X.columns)

    # standardize Y Values
    Y_values = normalizer.fit_transform(Y.values)
    Y = pd.DataFrame(data=Y_values, index=Y.index, columns=Y.columns)

    return X, Y

@task
def get_similarity_scores(X_ref: pd.DataFrame, Y_compare: pd.DataFrame) -> Dict:

    # RV Coefficient
    rv_results = rv_coefficient(X_ref, Y_compare)

#     # CKA Coefficient
#     cka_results = cka_coefficient(X_ref, Y_compare)

    # RBIG Coefficient
    rbig_results = rbig_it_measures(X_ref, Y_compare)

    results = {
        **rv_results,
#         **cka_results,
        **rbig_results
    }

    return results

Experiment Run

# variable = 'gross_primary_productivity'
# region = get_europe()

# datacube = get_dataset(variable)

# datacube = subset_cube(xr_data=datacube, bbox=region)
logger.setLevel(logging.INFO)

with Flow("Experiment-Step") as flow:

    # ======================
    # experiment parameters
    # ======================
    variable = Parameter("variable", default='gross_primary_productivity')
    region = Parameter("region", default=get_europe())
    period = Parameter("period", default=get_test_time())
    spatial = Parameter("spatial", default=1)
    temporal = Parameter("temporal", default=3)

    # ======================
    # experiment - Data
    # ======================
    # Get DataCube
    datacube = get_dataset(variable)

    # subset datacube (spatially)
    datacube = cube_spatial_subset(xr_data=datacube, bbox=region)[variable]

    # subset datacube (temporally)
    datacube = cube_temporal_subset(xr_data=datacube, period=period)

    # get datacubes
    reference_cube_df = get_reference_cube(data=datacube)

    # get density cubes
    density_cube_df = get_density_cubes(
        data=datacube, 
        spatial=spatial, 
        temporal=temporal
    )

    # get reference dataframe
    dfs = get_common_indices(
        reference_df=reference_cube_df, 
        density_df=density_cube_df
    )

    # standardize data
    dfs = standardizer_data(X=dfs[0], Y=dfs[1])

    # ======================
    # experiment - Methods
    # ======================
    res = get_similarity_scores(X_ref=dfs[0], Y_compare=dfs[1])
state = flow.run()
[2020-05-01 10:16:21] INFO - prefect.FlowRunner | Beginning Flow run for 'Experiment-Step'
2020-05-01 12:16:21,361:INFO:Beginning Flow run for 'Experiment-Step'
[2020-05-01 10:16:21] INFO - prefect.FlowRunner | Starting flow run.
2020-05-01 12:16:21,372:INFO:Starting flow run.
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'variable': Starting task run...
2020-05-01 12:16:21,411:INFO:Task 'variable': Starting task run...
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'variable': finished task run for task with final state: 'Success'
2020-05-01 12:16:21,424:INFO:Task 'variable': finished task run for task with final state: 'Success'
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'region': Starting task run...
2020-05-01 12:16:21,454:INFO:Task 'region': Starting task run...
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'region': finished task run for task with final state: 'Success'
2020-05-01 12:16:21,467:INFO:Task 'region': finished task run for task with final state: 'Success'
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'period': Starting task run...
2020-05-01 12:16:21,496:INFO:Task 'period': Starting task run...
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'period': finished task run for task with final state: 'Success'
2020-05-01 12:16:21,509:INFO:Task 'period': finished task run for task with final state: 'Success'
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'temporal': Starting task run...
2020-05-01 12:16:21,539:INFO:Task 'temporal': Starting task run...
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'temporal': finished task run for task with final state: 'Success'
2020-05-01 12:16:21,551:INFO:Task 'temporal': finished task run for task with final state: 'Success'
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'get_dataset': Starting task run...
2020-05-01 12:16:21,581:INFO:Task 'get_dataset': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_dataset': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,485:INFO:Task 'get_dataset': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'cube_spatial_subset': Starting task run...
2020-05-01 12:16:22,505:INFO:Task 'cube_spatial_subset': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'cube_spatial_subset': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,525:INFO:Task 'cube_spatial_subset': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:22,545:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,554:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'cube_temporal_subset': Starting task run...
2020-05-01 12:16:22,574:INFO:Task 'cube_temporal_subset': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'cube_temporal_subset': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,585:INFO:Task 'cube_temporal_subset': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_reference_cube': Starting task run...
2020-05-01 12:16:22,605:INFO:Task 'get_reference_cube': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_reference_cube': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,653:INFO:Task 'get_reference_cube': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'spatial': Starting task run...
2020-05-01 12:16:22,687:INFO:Task 'spatial': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'spatial': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,700:INFO:Task 'spatial': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_density_cubes': Starting task run...
2020-05-01 12:16:22,729:INFO:Task 'get_density_cubes': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_density_cubes': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,769:INFO:Task 'get_density_cubes': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_common_indices': Starting task run...
2020-05-01 12:16:22,799:INFO:Task 'get_common_indices': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'get_common_indices': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,766:INFO:Task 'get_common_indices': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:23,786:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,795:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:23,815:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,824:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'standardizer_data': Starting task run...
2020-05-01 12:16:23,843:INFO:Task 'standardizer_data': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'standardizer_data': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,855:INFO:Task 'standardizer_data': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:23,874:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,883:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:23,903:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,912:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'get_similarity_scores': Starting task run...
2020-05-01 12:16:23,931:INFO:Task 'get_similarity_scores': Starting task run...
[2020-05-01 10:16:50] INFO - prefect.TaskRunner | Task 'get_similarity_scores': finished task run for task with final state: 'Success'
2020-05-01 12:16:50,094:INFO:Task 'get_similarity_scores': finished task run for task with final state: 'Success'
[2020-05-01 10:16:50] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
2020-05-01 12:16:50,097:INFO:Flow run SUCCESS: all reference tasks succeeded
state.result[res].result
{'rv_coeff': 0.9697258,
 'rv_x_norm': 26692.072,
 'rv_y_norm': 77907.49,
 'rv_xy_norm': 2016556900.0,
 'rbig_H_x': 1.855240533094599,
 'rbig_H_y': 1.1286197933913034,
 'rbig_I_xy': 5.499353957238775,
 'rbig_vi_coeff': 3.8004736863738287}