Spatial-Temporal Experiment¶
In this notebook, I will be walking through how we can estimate different methods based on the density cubes that we derive.
import sys, os
from pyprojroot import here
root = here(project_files=[".here"])
sys.path.append(str(here()))
import pathlib
# standard python packages
import xarray as xr
import pandas as pd
import numpy as np
#
# Experiment Functions
from src.data.esdc import get_dataset
from src.features import Metrics
from src.features.temporal import select_period, get_smoke_test_time, TimePeriod
from src.features.spatial import select_region, get_europe, get_spain
from src.models.train_models import get_similarity_scores
from src.experiments.utils import dict_product, run_parallel_step
from src.features import Metrics
from src.features.density import get_density_cubes
from src.features.preprocessing import standardizer_data, get_reference_cube, get_common_indices
from src.models.similarity import cka_coefficient, rv_coefficient, rbig_it_measures
# # esdc tools
# from src.esdc.subset import select_pixel
# from src.esdc.shape import ShapeFileExtract, rasterize
# from esdc.transform import DensityCubes
from typing import List, Dict
import xarray as xr
from tqdm import tqdm
import cartopy
import cartopy.crs as ccrs
# NUMPY SETTINGS
import numpy as onp
onp.set_printoptions(precision=3, suppress=True)
# MATPLOTLIB Settings
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
# SEABORN SETTINGS
import seaborn as sns
sns.set_context(context='talk',font_scale=0.7)
# sns.set(rc={'figure.figsize': (12, 9.)})
# sns.set_style("whitegrid")
# PANDAS SETTINGS
import pandas as pd
pd.set_option("display.max_rows", 120)
pd.set_option("display.max_columns", 120)
# LOGGING SETTINGS
import sys
import logging
logging.basicConfig(
level=logging.INFO,
stream=sys.stdout,
format='%(asctime)s:%(levelname)s:%(message)s'
)
logger = logging.getLogger()
#logger.setLevel(logging.INFO)
%load_ext autoreload
%autoreload 2
Experiment Steps¶
Global Variables¶
Parameters¶
parameters = {}
parameters['variable'] = [
'gross_primary_productivity',
'leaf_area_index'
]
parameters['region'] = ["world"]
parameters['period'] = [TimePeriod(name="201001_201012", start="Jan-2010", end="Dec-2010")]
parameters['spatial'] = [1, 2, 3, 4, 5, 6]
parameters['temporal'] = [1, 2, 3, 4, 5, 6]
# params = list(dict_product(parameters))
params = list(dict_product(parameters))
print(len(params))
smoke_test = True
Experimental Step¶
def step(params: Dict, smoke_test: bool=False):
# ======================
# experiment - Data
# ======================
# Get DataCube
datacube = get_dataset(params['variable'])
# t = clip_dataset_by_geometry(datacube, get_spain())
# datacube = select_region(
# xr_data=datacube, bbox=get_spain()
# )[params['variable']]
# return t
# subset datacube (spatially)
datacube = select_region(
xr_data=datacube, bbox=params['region']
)[params['variable']]
# subset datacube (temporally)
datacube = select_period(xr_data=datacube, period=params['period'])
# get datacubes
reference_cube_df = get_reference_cube(data=datacube)
# get density cubes
density_cube_df = get_density_cubes(
data=datacube,
spatial=params['spatial'],
temporal=params['temporal']
)
# get reference dataframe
X, Y = get_common_indices(
reference_df=reference_cube_df,
density_df=density_cube_df
)
# standardize data
X, Y = standardizer_data(X=X, Y=Y)
# ======================
# experiment - Methods
# ======================
res = get_similarity_scores(X_ref=X, Y_compare=Y, smoke_test=smoke_test)
# Save Results
results = pd.DataFrame({
'region': params['region'].name,
'period': params['period'].name,
'variable': params['variable'],
'spatial': params['spatial'],
'temporal': params['temporal'],
**res
}, index=[0])
return results
results = step(params[0], smoke_test=True)
results
t = get_spain()
from xcube.
res
res
from prefect import task, Flow, Parameter
@task # get Dataset
def get_dataset(variable: str)-> xr.Dataset:
return xr.open_zarr(str(filename))[[variable]]
@task # subset datacube
def cube_spatial_subset(xr_data: xr.Dataset, bbox: Region) -> xr.Dataset:
"""Function to spatially subset an xarray dataset from a bounding box."""
# get bounding box
bbox = shapely.geometry.box(
bbox.lonmin,
bbox.latmin,
bbox.lonmax,
bbox.latmax
)
# subset datacube
return clip_dataset_by_geometry(xr_data, bbox)
@task
def cube_temporal_subset(xr_data: xr.DataArray, period: Tuple[str, str]) -> xr.DataArray:
"""Function to temporally subset an xarray dataset from a tuple of
start date and end date
"""
return xr_data.sel(time=slice(period.start, period.end))
@task # get reference cube
def get_reference_cube(data: xr.DataArray) -> pd.DataFrame:
"""Wrapper Function to get reference cube"""
return data.to_dataframe().dropna().reorder_levels(levels)
@task # get density cubes
def get_density_cubes(data: xr.DataArray, spatial: int, temporal: int) -> pd.DataFrame:
"""Wrapper Function to get density cubes from a dataarray"""
return DensityCubes(
spatial_window=spatial,
time_window=temporal
).get_minicubes(data).reorder_levels(levels)
@task # get common indices
def get_common_indices(
reference_df: pd.DataFrame, density_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
idx = density_df.index.intersection(reference_df.index)
return reference_df.loc[idx,:], density_df.loc[idx, :]
@task # standardize the data before
def standardizer_data(X: pd.DataFrame, Y: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
# standardizer
normalizer = StandardScaler(with_mean=True, with_std=True)
# standardize X values
X_values = normalizer.fit_transform(X.values)
X = pd.DataFrame(data=X_values, index=X.index, columns=X.columns)
# standardize Y Values
Y_values = normalizer.fit_transform(Y.values)
Y = pd.DataFrame(data=Y_values, index=Y.index, columns=Y.columns)
return X, Y
@task
def get_similarity_scores(X_ref: pd.DataFrame, Y_compare: pd.DataFrame) -> Dict:
# RV Coefficient
rv_results = rv_coefficient(X_ref, Y_compare)
# # CKA Coefficient
# cka_results = cka_coefficient(X_ref, Y_compare)
# RBIG Coefficient
rbig_results = rbig_it_measures(X_ref, Y_compare)
results = {
**rv_results,
# **cka_results,
**rbig_results
}
return results
Experiment Run¶
# variable = 'gross_primary_productivity'
# region = get_europe()
# datacube = get_dataset(variable)
# datacube = subset_cube(xr_data=datacube, bbox=region)
logger.setLevel(logging.INFO)
with Flow("Experiment-Step") as flow:
# ======================
# experiment parameters
# ======================
variable = Parameter("variable", default='gross_primary_productivity')
region = Parameter("region", default=get_europe())
period = Parameter("period", default=get_test_time())
spatial = Parameter("spatial", default=1)
temporal = Parameter("temporal", default=3)
# ======================
# experiment - Data
# ======================
# Get DataCube
datacube = get_dataset(variable)
# subset datacube (spatially)
datacube = cube_spatial_subset(xr_data=datacube, bbox=region)[variable]
# subset datacube (temporally)
datacube = cube_temporal_subset(xr_data=datacube, period=period)
# get datacubes
reference_cube_df = get_reference_cube(data=datacube)
# get density cubes
density_cube_df = get_density_cubes(
data=datacube,
spatial=spatial,
temporal=temporal
)
# get reference dataframe
dfs = get_common_indices(
reference_df=reference_cube_df,
density_df=density_cube_df
)
# standardize data
dfs = standardizer_data(X=dfs[0], Y=dfs[1])
# ======================
# experiment - Methods
# ======================
res = get_similarity_scores(X_ref=dfs[0], Y_compare=dfs[1])
state = flow.run()
state.result[res].result