Spatial-Temporal Experiment¶
In this notebook, I will be walking through how we can estimate different methods based on the density cubes that we derive.
import sys, os
from pyprojroot import here
root = here(project_files=[".here"])
sys.path.append(str(here()))
import pathlib
# standard python packages
import xarray as xr
import pandas as pd
import numpy as np
from xcube.core.geom import clip_dataset_by_geometry
#
from src.features import Metrics
from src.features.preprocessing import DensityCubes
from sklearn.preprocessing import StandardScaler
# # esdc tools
# from src.esdc.subset import select_pixel
# from src.esdc.shape import ShapeFileExtract, rasterize
# from esdc.transform import DensityCubes
from typing import List, Dict
import xarray as xr
from tqdm import tqdm
import cartopy
import cartopy.crs as ccrs
# NUMPY SETTINGS
import numpy as onp
onp.set_printoptions(precision=3, suppress=True)
# MATPLOTLIB Settings
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
# SEABORN SETTINGS
import seaborn as sns
sns.set_context(context='talk',font_scale=0.7)
# sns.set(rc={'figure.figsize': (12, 9.)})
# sns.set_style("whitegrid")
# PANDAS SETTINGS
import pandas as pd
pd.set_option("display.max_rows", 120)
pd.set_option("display.max_columns", 120)
# LOGGING SETTINGS
import sys
import logging
logging.basicConfig(
level=logging.INFO,
stream=sys.stdout,
format='%(asctime)s:%(levelname)s:%(message)s'
)
logger = logging.getLogger()
#logger.setLevel(logging.INFO)
%load_ext autoreload
%autoreload 2
Experiment Steps¶
Global Variables¶
from collections import namedtuple
from typing import Union, Tuple
import xarray as xr
import rioxarray
import shapely
# global variables
# Datapath
DATA_PATH = pathlib.Path("/media/disk/databases/ESDC/")
levels = ['time', 'lon', 'lat']
# get filename
filename = DATA_PATH.joinpath("esdc-8d-0.25deg-1x720x1440-2.0.0.zarr")
Region = namedtuple("Region", ["name", "lonmin", "lonmax", "latmin", "latmax"])
TimePeriod = namedtuple("TimePeriod", ["name", "start", "end"])
def get_test_time() -> TimePeriod:
return TimePeriod(name="201001_201012", start="Jan-2010", end="Dec-2010")
# return TimePeriod(name='test_201007', start='July-2010', end='July-2010')
def get_europe() -> Region:
"""As an example, I often choose Europe. This is a decent bounding box."""
return Region(name="europe", latmax=35.5, latmin=71.5, lonmax=60.0, lonmin=-18.0)
Parameters¶
variables = [
'gross_primary_productivity',
'root_moisture',
'land_surface_temperature'
]
Functions¶
from prefect import task, Flow, Parameter
@task # get Dataset
def get_dataset(variable: str)-> xr.Dataset:
return xr.open_zarr(str(filename))[[variable]]
@task # subset datacube
def cube_spatial_subset(xr_data: xr.Dataset, bbox: Region) -> xr.Dataset:
"""Function to spatially subset an xarray dataset from a bounding box."""
# get bounding box
bbox = shapely.geometry.box(
bbox.lonmin,
bbox.latmin,
bbox.lonmax,
bbox.latmax
)
# subset datacube
return clip_dataset_by_geometry(xr_data, bbox)
@task
def cube_temporal_subset(xr_data: xr.DataArray, period: Tuple[str, str]) -> xr.DataArray:
"""Function to temporally subset an xarray dataset from a tuple of
start date and end date
"""
return xr_data.sel(time=slice(period.start, period.end))
@task # get reference cube
def get_reference_cube(data: xr.DataArray) -> pd.DataFrame:
"""Wrapper Function to get reference cube"""
return data.to_dataframe().dropna().reorder_levels(levels)
@task # get density cubes
def get_density_cubes(data: xr.DataArray, spatial: int, temporal: int) -> pd.DataFrame:
"""Wrapper Function to get density cubes from a dataarray"""
return DensityCubes(
spatial_window=spatial,
time_window=temporal
).get_minicubes(data).reorder_levels(levels)
@task # get common indices
def get_common_indices(
reference_df: pd.DataFrame, density_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
idx = density_df.index.intersection(reference_df.index)
return reference_df.loc[idx,:], density_df.loc[idx, :]
@task # standardize the data before
def standardizer_data(X: pd.DataFrame, Y: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
# standardizer
normalizer = StandardScaler(with_mean=True, with_std=True)
# standardize X values
X_values = normalizer.fit_transform(X.values)
X = pd.DataFrame(data=X_values, index=X.index, columns=X.columns)
# standardize Y Values
Y_values = normalizer.fit_transform(Y.values)
Y = pd.DataFrame(data=Y_values, index=Y.index, columns=Y.columns)
return X, Y
@task
def get_similarity_scores(X_ref: pd.DataFrame, Y_compare: pd.DataFrame) -> Dict:
# RV Coefficient
rv_results = rv_coefficient(X_ref, Y_compare)
# # CKA Coefficient
# cka_results = cka_coefficient(X_ref, Y_compare)
# RBIG Coefficient
rbig_results = rbig_it_measures(X_ref, Y_compare)
results = {
**rv_results,
# **cka_results,
**rbig_results
}
return results
from src.models.similarity import cka_coefficient, rv_coefficient, rbig_it_measures
Experiment Run¶
# variable = 'gross_primary_productivity'
# region = get_europe()
# datacube = get_dataset(variable)
# datacube = subset_cube(xr_data=datacube, bbox=region)
logger.setLevel(logging.INFO)
with Flow("Experiment-Step") as flow:
# ======================
# experiment parameters
# ======================
variable = Parameter("variable", default='gross_primary_productivity')
region = Parameter("region", default=get_europe())
period = Parameter("period", default=get_test_time())
spatial = Parameter("spatial", default=1)
temporal = Parameter("temporal", default=3)
# ======================
# experiment - Data
# ======================
# Get DataCube
datacube = get_dataset(variable)
# subset datacube (spatially)
datacube = cube_spatial_subset(xr_data=datacube, bbox=region)[variable]
# subset datacube (temporally)
datacube = cube_temporal_subset(xr_data=datacube, period=period)
# get datacubes
reference_cube_df = get_reference_cube(data=datacube)
# get density cubes
density_cube_df = get_density_cubes(
data=datacube,
spatial=spatial,
temporal=temporal
)
# get reference dataframe
dfs = get_common_indices(
reference_df=reference_cube_df,
density_df=density_cube_df
)
# standardize data
dfs = standardizer_data(X=dfs[0], Y=dfs[1])
# ======================
# experiment - Methods
# ======================
res = get_similarity_scores(X_ref=dfs[0], Y_compare=dfs[1])
state = flow.run()
state.result[res].result