In this case, we try to directly parameterize the field.
Data Representation¶
# get dataset (Coordinate based)
ds: xr.Dataset = ...
# convert to ML-ready tensor
x: Array["N"] = ds.x.values
t: Array["N"] = ds.t.values
y: Array["N D"] = ds.variable.values
Model¶
We assume that there is some underlying model which can be inferred from the spatiotemporal coordinates.
In other words, we can write this as
# initialize parameterized function
params: Params = ...
fn: Model = Model(params)
# apply function
y_pred: Array["N D"] = fn(x, t)
# initial gaussian conditional likelihood
sigma: float = ...
model: ProbModel = Gaussian(mean_fn=fn, variance=sigma**2)
# apply model N(y|f(x,t),sigma)
y_pred: Dist = model(x,t)
y_pred_mean: Array["N Dy"] = y_pred.mean()
y_pred_var: Array["N Dy"] = y_pred.variance()
y_pred_samples: Array["N Dy"] = y_pred.sample(N=10)
Criteria¶
We can learn some underlying parameterization by finding the best parameters given some loss function, .
We can minimize the data likelihood
# calculate log probability
loss: Array[""] = model.log_prob(y_true)
# create loss function
loss_fn: Callable = lambda rv_y, y_true: rv_y.log_prob(y_true)
To train, we can use any
# initialize criteria and training regime
loss: Loss = MSE()
optimizer: Optimizer = SGD(learning_rate=0.1)
# learn parameters
params: PyTree = fit_model(
model=model,
data=[[x,t], y],
optimizer=optimizer, loss=loss
)
Inference¶
# get coordinates for new domain
x_new: Array["M"] = ...
t_new: Array["M"] = ...
# apply model
y_pred: Array["M D"] = model(x_new, t_new, params)
There are many upgrades we can do:
- Improved Loss function
- Conditional Model,
- Heterogeneous Noise Model,
Latent Interpolator¶
Model¶
# init prior model
mean: Array["Dz"] = zeros_like(...)
sigma: Array["Dz"] = ones_like(...)
prior_model: Dist = DiagGaussian(mean=mean, sigma=sigma)
z_samples: Array["N Dz"] = prior_model.sample(N=...)
# init mean function - nerf
mean_fn: Model = init_nerf(...)
x: Array["Ds"] = ...
t: Array[""] = ...
y_pred: Array["Dy"] = mean_fn(x,t,z)
# init likelihood model w/ parameterized mean fn
sigma: float = ...
likelihood_model: Dist = CondGaussian(mean_fn=fn, scale=sigma)
y_pred: Dist = likelihood_model(x, t, z)
y_pred_mean: Array["Dy"] = y_pred.mean()
y_pred_var: Array["Dy"] = y_pred.variance()
y_pred_samples: Array["N Dy"] = y_pred.sample(N=...)
log_prob: Array["Dy"] = y_pred.log_prob(y)
Criteria¶
We are interested in finding the best latent variable, , that fits the data, . The posterior is given by:
We can write the criteria as the KL-Divergence between the variational distribution, , and the posterior, .
The general criteria is given by the ELBO which is an upper bound on the KLD between the variational distribution, , and the prior, .
However, we’re going to use the Flow-based objective function which is given by:
Pseudo-Code¶
Forward Transformation¶
First, we need to sample from the variational distribution
# init variational dist
var_dist: Dist = DiagonalGaussian(mean=..., sigma=...)
# sample from variational dist, z ~ q(z)
z_sample: Array["N Dz"] = var_dist.sample(N=...)
Now, we can calculate the log probability of the variational dist and the likelihood terms.
# FORWARD TRANSFORMATION: y,x,t --> z
# sample from data distribution,
y_sample: Array["Dy"], x: Array["Ds"], t: Array[""] = Data(N=...)
# likelihood model, log p(y|x,t,z)
log_py: Array["Dy"] = likelihood_model(x, t, z_sample).log_prob(y_sample)
# var dist, log q(z)
log_pz: Array["Dy"] = var_dst.log_prob(z_sample)
ldj: Array["Dy"] = log_py - log_qz
# INVERSE TRANSFORMATION: Z --> Y
z_sample: Array["Dz"] = ...
# sample
y_sample: Array["Dy"] = likelihood_model(x, t, z_sample).sample(N=...)
Inference¶
All Together¶
#
Amortized Model¶
Data Representation¶
# initialize domain
domain: Domain = ...
# initialize values
y_values: Array["Dx Dy"] = ...
# initialize field
y: Field["Dx Dy"] = Field(y_values, domain)
Model¶
In this case, we assume that there is some underlying generative model that can be inferred
# initialize decoder model
decoder_fn: Model = ...
# initialize
sigma: float = ...
prob_model: CondModel = CondGaussian(mean_fn=decoder_fn, variance=sigma**2)
# apply encoder N(z|f(y),sigma)
z: Array["Dz"] = prob_model.mean(context=y)
z: Array["Dz"] = prob_model.variance(context=y)
z: Array["N Dz"] = prob_model.sample(context=y, N=10)
# calculate loss
loss: Array[""] = prob_model.log_prob(context=y, x=z)
We have a constraint
# initialize both
model: Model = EncoderDecoder(encoder=encoder_model, decoder=decoder_model)
params: Params = [encoder_params, decoder_params]
# apply model
y_pred: Array["Dx Dy"] = model(y, params)
Criteria¶
Inference¶
# get observations
y_obs: Array["Dx Dy"] = ...
# apply model
y_pred: Array["Dx Dy"] = model(y_obs, params)
There are many improvements to this model that we can do:
- Use a simplified linear model - PCA
- Improved Loss Function
- Stochastic Encoder
Training Tips
We can always do some augmentations to learn
Regression-Like
# create augmentation functions
fn = [
random_noise,
resize,
patchify,
missing_data
]
aug_fn: Callable = make_seq_fn(fn)
# apply augmentation function
y: Array["Dx Dy"] = aug_fn(y)
Image-Like
# create augmentation functions
fn = [
random_rotate,
random_flip,
random_constrast,
random_translate
]
aug_fn: Callable = make_seq_fn(fn)
# apply augmentation function
y: Array["Dx Dy"] = aug_fn(y)