Skip to content

Spatial-Temporal Experiment

In this notebook, I will be walking through how we can estimate different methods based on the density cubes that we derive.

import sys, os
from pyprojroot import here
root = here(project_files=[".here"])
sys.path.append(str(here()))

import pathlib

# standard python packages
import xarray as xr
import pandas as pd
import numpy as np

# 
# Experiment Functions
from src.data.esdc import get_dataset
from src.features import Metrics
from src.features.temporal import select_period, get_smoke_test_time, TimePeriod
from src.features.spatial import select_region, get_europe, get_spain
from src.models.train_models import get_similarity_scores
from src.experiments.utils import dict_product, run_parallel_step

from src.features import Metrics
from src.features.density import get_density_cubes
from src.features.preprocessing import standardizer_data, get_reference_cube, get_common_indices
from src.models.similarity import cka_coefficient, rv_coefficient, rbig_it_measures

# # esdc tools
# from src.esdc.subset import select_pixel
# from src.esdc.shape import ShapeFileExtract, rasterize
# from esdc.transform import DensityCubes

from typing import List, Dict
import xarray as xr

from tqdm import tqdm

import cartopy
import cartopy.crs as ccrs

# NUMPY SETTINGS
import numpy as onp
onp.set_printoptions(precision=3, suppress=True)

# MATPLOTLIB Settings
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

# SEABORN SETTINGS
import seaborn as sns
sns.set_context(context='talk',font_scale=0.7)
# sns.set(rc={'figure.figsize': (12, 9.)})
# sns.set_style("whitegrid")

# PANDAS SETTINGS
import pandas as pd
pd.set_option("display.max_rows", 120)
pd.set_option("display.max_columns", 120)

# LOGGING SETTINGS
import sys
import logging
logging.basicConfig(
    level=logging.INFO, 
    stream=sys.stdout,
    format='%(asctime)s:%(levelname)s:%(message)s'
)
logger = logging.getLogger()
#logger.setLevel(logging.INFO)

%load_ext autoreload
%autoreload 2

Experiment Steps

Global Variables

Parameters

parameters = {}
parameters['variable'] = [
    'gross_primary_productivity',
    'leaf_area_index'
]
parameters['region'] = ["world"]
parameters['period'] = [TimePeriod(name="201001_201012", start="Jan-2010", end="Dec-2010")]
parameters['spatial'] = [1, 2, 3, 4, 5, 6]
parameters['temporal'] = [1, 2, 3, 4, 5, 6]

# params = list(dict_product(parameters))

params = list(dict_product(parameters))
print(len(params))
smoke_test = True
72

Experimental Step

def step(params: Dict, smoke_test: bool=False):
    # ======================
    # experiment - Data
    # ======================
    # Get DataCube
    datacube = get_dataset(params['variable'])

#     t = clip_dataset_by_geometry(datacube, get_spain())

#     datacube = select_region(
#         xr_data=datacube, bbox=get_spain()
#     )[params['variable']]
#     return t

    # subset datacube (spatially)
    datacube = select_region(
        xr_data=datacube, bbox=params['region']
    )[params['variable']]

    # subset datacube (temporally)
    datacube = select_period(xr_data=datacube, period=params['period'])

    # get datacubes
    reference_cube_df = get_reference_cube(data=datacube)

    # get density cubes
    density_cube_df = get_density_cubes(
        data=datacube, 
        spatial=params['spatial'], 
        temporal=params['temporal']
    )

    # get reference dataframe
    X, Y = get_common_indices(
        reference_df=reference_cube_df, 
        density_df=density_cube_df
    )

    # standardize data
    X, Y = standardizer_data(X=X, Y=Y)

    # ======================
    # experiment - Methods
    # ======================
    res = get_similarity_scores(X_ref=X, Y_compare=Y, smoke_test=smoke_test)

    # Save Results
    results = pd.DataFrame({
        'region': params['region'].name,
        'period': params['period'].name,
        'variable': params['variable'],
        'spatial': params['spatial'],
        'temporal': params['temporal'],
        **res

    }, index=[0])
    return results
results = step(params[0], smoke_test=True)
results
region period variable spatial temporal rv_coef x_norm y_norm xy_norm cka_coeff cka_y_norm cka_x_norm cka_xy_norm rbig_H_x rbig_H_y rbig_H_time rbig_I_xy rbig_I_time rbig_I_xx rbig_Ixx_time
0 spain 201001_201012 root_moisture 1 1 1.0 78.507996 78.507996 6163.505371 1.0 33.539061 33.539061 1124.868597 1.957913 1.957913 0.27093 24.79073 0.719313 24.79073 0.602488
t = get_spain()
from xcube.
shapely.geometry.polygon.Polygon
res
{'rv_coeff': 0.9403951,
 'rv_x_norm': 44288.883,
 'rv_y_norm': 357065.0,
 'rv_xy_norm': 14871418000.0,
 'rv_time': 38.40082359313965,
 'rbig_H_x': 1.9247062049009207,
 'rbig_H_y': 4.4428976758211896,
 'rbig_H_time': 3.0350914001464844,
 'rbig_I_xy': 3.649542912651551,
 'rbig_I_time': 31.18929934501648,
 'rbig_vi_coeff': 1.2480244562128495}
res
{'rv_coeff': 0.9697245,
 'rv_x_norm': 26692.102,
 'rv_y_norm': 77907.13,
 'rv_xy_norm': 2016547100.0,
 'rbig_H_x': 1.855240533094599,
 'rbig_H_y': 1.0902273375895914,
 'rbig_I_xy': 5.405821100129361,
 'rbig_vi_coeff': 3.801045104354525}
from prefect import task, Flow, Parameter

@task # get Dataset
def get_dataset(variable: str)-> xr.Dataset:
    return xr.open_zarr(str(filename))[[variable]]

@task # subset datacube
def cube_spatial_subset(xr_data: xr.Dataset, bbox: Region) -> xr.Dataset:
    """Function to spatially subset an xarray dataset from a bounding box."""
    # get bounding box
    bbox = shapely.geometry.box(
        bbox.lonmin,
        bbox.latmin,
        bbox.lonmax,
        bbox.latmax
    )
    # subset datacube
    return clip_dataset_by_geometry(xr_data, bbox)

@task 
def cube_temporal_subset(xr_data: xr.DataArray, period: Tuple[str, str]) -> xr.DataArray:
    """Function to temporally subset an xarray dataset from a tuple of
    start date and end date
    """
    return xr_data.sel(time=slice(period.start, period.end))

@task # get reference cube
def get_reference_cube(data: xr.DataArray) -> pd.DataFrame:
    """Wrapper Function to get reference cube"""
    return data.to_dataframe().dropna().reorder_levels(levels)

@task # get density cubes
def get_density_cubes(data: xr.DataArray, spatial: int, temporal: int) -> pd.DataFrame:
    """Wrapper Function to get density cubes from a dataarray"""
    return DensityCubes(
        spatial_window=spatial,
        time_window=temporal
    ).get_minicubes(data).reorder_levels(levels)

@task # get common indices
def get_common_indices(
    reference_df: pd.DataFrame, density_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    idx = density_df.index.intersection(reference_df.index)
    return reference_df.loc[idx,:], density_df.loc[idx, :]

@task # standardize the data before
def standardizer_data(X: pd.DataFrame, Y: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:

    # standardizer
    normalizer = StandardScaler(with_mean=True, with_std=True)

    # standardize X values
    X_values = normalizer.fit_transform(X.values)
    X = pd.DataFrame(data=X_values, index=X.index, columns=X.columns)

    # standardize Y Values
    Y_values = normalizer.fit_transform(Y.values)
    Y = pd.DataFrame(data=Y_values, index=Y.index, columns=Y.columns)

    return X, Y

@task
def get_similarity_scores(X_ref: pd.DataFrame, Y_compare: pd.DataFrame) -> Dict:

    # RV Coefficient
    rv_results = rv_coefficient(X_ref, Y_compare)

#     # CKA Coefficient
#     cka_results = cka_coefficient(X_ref, Y_compare)

    # RBIG Coefficient
    rbig_results = rbig_it_measures(X_ref, Y_compare)

    results = {
        **rv_results,
#         **cka_results,
        **rbig_results
    }

    return results

Experiment Run

# variable = 'gross_primary_productivity'
# region = get_europe()

# datacube = get_dataset(variable)

# datacube = subset_cube(xr_data=datacube, bbox=region)
logger.setLevel(logging.INFO)

with Flow("Experiment-Step") as flow:

    # ======================
    # experiment parameters
    # ======================
    variable = Parameter("variable", default='gross_primary_productivity')
    region = Parameter("region", default=get_europe())
    period = Parameter("period", default=get_test_time())
    spatial = Parameter("spatial", default=1)
    temporal = Parameter("temporal", default=3)

    # ======================
    # experiment - Data
    # ======================
    # Get DataCube
    datacube = get_dataset(variable)

    # subset datacube (spatially)
    datacube = cube_spatial_subset(xr_data=datacube, bbox=region)[variable]

    # subset datacube (temporally)
    datacube = cube_temporal_subset(xr_data=datacube, period=period)

    # get datacubes
    reference_cube_df = get_reference_cube(data=datacube)

    # get density cubes
    density_cube_df = get_density_cubes(
        data=datacube, 
        spatial=spatial, 
        temporal=temporal
    )

    # get reference dataframe
    dfs = get_common_indices(
        reference_df=reference_cube_df, 
        density_df=density_cube_df
    )

    # standardize data
    dfs = standardizer_data(X=dfs[0], Y=dfs[1])

    # ======================
    # experiment - Methods
    # ======================
    res = get_similarity_scores(X_ref=dfs[0], Y_compare=dfs[1])
state = flow.run()
[2020-05-01 10:16:21] INFO - prefect.FlowRunner | Beginning Flow run for 'Experiment-Step'
2020-05-01 12:16:21,361:INFO:Beginning Flow run for 'Experiment-Step'
[2020-05-01 10:16:21] INFO - prefect.FlowRunner | Starting flow run.
2020-05-01 12:16:21,372:INFO:Starting flow run.
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'variable': Starting task run...
2020-05-01 12:16:21,411:INFO:Task 'variable': Starting task run...
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'variable': finished task run for task with final state: 'Success'
2020-05-01 12:16:21,424:INFO:Task 'variable': finished task run for task with final state: 'Success'
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'region': Starting task run...
2020-05-01 12:16:21,454:INFO:Task 'region': Starting task run...
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'region': finished task run for task with final state: 'Success'
2020-05-01 12:16:21,467:INFO:Task 'region': finished task run for task with final state: 'Success'
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'period': Starting task run...
2020-05-01 12:16:21,496:INFO:Task 'period': Starting task run...
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'period': finished task run for task with final state: 'Success'
2020-05-01 12:16:21,509:INFO:Task 'period': finished task run for task with final state: 'Success'
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'temporal': Starting task run...
2020-05-01 12:16:21,539:INFO:Task 'temporal': Starting task run...
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'temporal': finished task run for task with final state: 'Success'
2020-05-01 12:16:21,551:INFO:Task 'temporal': finished task run for task with final state: 'Success'
[2020-05-01 10:16:21] INFO - prefect.TaskRunner | Task 'get_dataset': Starting task run...
2020-05-01 12:16:21,581:INFO:Task 'get_dataset': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_dataset': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,485:INFO:Task 'get_dataset': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'cube_spatial_subset': Starting task run...
2020-05-01 12:16:22,505:INFO:Task 'cube_spatial_subset': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'cube_spatial_subset': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,525:INFO:Task 'cube_spatial_subset': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:22,545:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,554:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'cube_temporal_subset': Starting task run...
2020-05-01 12:16:22,574:INFO:Task 'cube_temporal_subset': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'cube_temporal_subset': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,585:INFO:Task 'cube_temporal_subset': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_reference_cube': Starting task run...
2020-05-01 12:16:22,605:INFO:Task 'get_reference_cube': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_reference_cube': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,653:INFO:Task 'get_reference_cube': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'spatial': Starting task run...
2020-05-01 12:16:22,687:INFO:Task 'spatial': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'spatial': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,700:INFO:Task 'spatial': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_density_cubes': Starting task run...
2020-05-01 12:16:22,729:INFO:Task 'get_density_cubes': Starting task run...
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_density_cubes': finished task run for task with final state: 'Success'
2020-05-01 12:16:22,769:INFO:Task 'get_density_cubes': finished task run for task with final state: 'Success'
[2020-05-01 10:16:22] INFO - prefect.TaskRunner | Task 'get_common_indices': Starting task run...
2020-05-01 12:16:22,799:INFO:Task 'get_common_indices': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'get_common_indices': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,766:INFO:Task 'get_common_indices': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:23,786:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,795:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:23,815:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,824:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'standardizer_data': Starting task run...
2020-05-01 12:16:23,843:INFO:Task 'standardizer_data': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'standardizer_data': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,855:INFO:Task 'standardizer_data': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:23,874:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,883:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
2020-05-01 12:16:23,903:INFO:Task 'GetItem': Starting task run...
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
2020-05-01 12:16:23,912:INFO:Task 'GetItem': finished task run for task with final state: 'Success'
[2020-05-01 10:16:23] INFO - prefect.TaskRunner | Task 'get_similarity_scores': Starting task run...
2020-05-01 12:16:23,931:INFO:Task 'get_similarity_scores': Starting task run...
[2020-05-01 10:16:50] INFO - prefect.TaskRunner | Task 'get_similarity_scores': finished task run for task with final state: 'Success'
2020-05-01 12:16:50,094:INFO:Task 'get_similarity_scores': finished task run for task with final state: 'Success'
[2020-05-01 10:16:50] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
2020-05-01 12:16:50,097:INFO:Flow run SUCCESS: all reference tasks succeeded
state.result[res].result
{'rv_coeff': 0.9697258,
 'rv_x_norm': 26692.072,
 'rv_y_norm': 77907.49,
 'rv_xy_norm': 2016556900.0,
 'rbig_H_x': 1.855240533094599,
 'rbig_H_y': 1.1286197933913034,
 'rbig_I_xy': 5.499353957238775,
 'rbig_vi_coeff': 3.8004736863738287}