Pipeline idioms — a recipe gallery¶
geotoolz ships 14 operators in v0.1, but the algebra is general enough to
express many more patterns today — observers, profilers, retries, caching,
provenance stamps, QC assertions, all using just Operator, Sequential,
and a few lines of subclass code.
This notebook is the recipe book. Each section covers one operator family,
shows the v0.1 op (if shipped) or a minimal build-your-own implementation,
drops it into a Sequential, and notes the tradeoffs.
Status note. Many of the named operators in this notebook (e.g.
Profile,Histogram,Try,Cache) are scoped for v0.2+. The library will ship polished versions when they earn their keep — but the patterns work today using only the v0.1 primitives. Treat each build-your-own recipe as "copy until we ship the real one."
Everything below runs on plain integers or dicts (no GeoTensor setup). The
shapes translate directly to real pipelines.
Shared setup — stand-in domain operators¶
A few minimal Operator subclasses standing in for the domain ops we haven't shipped yet. They're not pretending to be real — they're just named placeholders so the shape of a Sequential([MaskClouds(...), NDVI(...)]) is recognisable.
from geotoolz import Operator, Sequential
class Add(Operator):
"""Tiny test operator: x → x + n."""
def __init__(self, n: int) -> None:
self.n = n
def _apply(self, x: int) -> int:
return x + self.n
def get_config(self) -> dict:
return {"n": self.n}
class _NamedStep(Operator):
"""Named identity-with-tag stand-in for any domain op (NDVI, MaskClouds, ...)."""
def __init__(self, name: str, *, factor: float = 1.0) -> None:
self.name = name
self.factor = factor
def _apply(self, x):
return x * self.factor
def get_config(self) -> dict:
return {"name": self.name, "factor": self.factor}
# A 'pipeline' that stands in for a real preprocessing chain
demo_pipe = Sequential(
[
_NamedStep("MaskClouds", factor=1.0),
_NamedStep("PercentileClip", factor=0.98),
_NamedStep("NDVI", factor=0.5),
]
)
print(demo_pipe)
print("demo_pipe(10) =", demo_pipe(10))
Sequential([_NamedStep(name='MaskClouds', factor=1.0), _NamedStep(name='PercentileClip', factor=0.98), _NamedStep(name='NDVI', factor=0.5)]) demo_pipe(10) = 4.9
1. Inspection / introspection (the Tap family)¶
Identity operators with side effects — they let you observe a pipeline mid-flight without breaking the chain. Drop them between steps to log stats, capture intermediates, profile, or validate against a reference.
Tap — fire a callback (shipped)¶
The seed pattern. Calls fn(x), passes input through unchanged. Returns
of fn are ignored — Tap is for side effects, not transforms.
from geotoolz import Tap
seen = []
pipe = Sequential(
[
Add(1),
Tap(lambda x: seen.append(("after Add(1)", x))),
Add(10),
Tap(lambda x: seen.append(("after Add(10)", x))),
]
)
out = pipe(0)
print("result :", out)
print("log :", seen)
result : 11
log : [('after Add(1)', 1), ('after Add(10)', 11)]
Snapshot — capture intermediates by name (shipped)¶
A controller (not an Operator itself) that produces snapshot ops via
snap.at(key). After the pipeline runs, intermediates are keyed in
snap[key]. Useful for "what did step 3 produce?" without re-running.
from geotoolz import Snapshot
snap = Snapshot()
pipe = Sequential(
[
Add(1),
snap.at("after_first"),
Add(10),
snap.at("after_second"),
Add(100),
]
)
pipe(0)
print("captured:", dict(snap.items()))
captured: {'after_first': 1, 'after_second': 11}
Profile / TimeIt — per-step timings (build-your-own)¶
The library will ship Profile in v0.2. For now, ~15 lines of Tap plus
time.perf_counter:
import time
from collections import defaultdict
class Profile:
"""Per-step timing controller. ``profile.wrap(op)`` returns a timed-wrapped op."""
def __init__(self) -> None:
self._timings: dict[str, list[float]] = defaultdict(list)
def wrap(self, op: Operator) -> Operator:
return _TimedOp(op, self._timings)
def report(self) -> None:
for name, ts in self._timings.items():
print(f" {name:18s} mean={sum(ts) / len(ts) * 1e6:7.1f} µs n={len(ts)}")
class _TimedOp(Operator):
forbid_in_yaml = True
def __init__(self, inner: Operator, store: dict) -> None:
self.inner, self.store = inner, store
def _apply(self, x):
t0 = time.perf_counter()
out = self.inner(x)
self.store[type(self.inner).__name__].append(time.perf_counter() - t0)
return out
def get_config(self) -> dict:
return {"inner": type(self.inner).__name__}
prof = Profile()
pipe = Sequential([prof.wrap(Add(1)), prof.wrap(Add(10)), prof.wrap(Add(100))])
for _ in range(100):
pipe(0)
print("Per-step timings:")
prof.report()
Per-step timings: Add mean= 0.6 µs n=300
Tradeoff. wrap() adds one Python frame per op — negligible compared
to GeoTensor work but visible in microbenchmark territory.
Histogram — capture distributions (build-your-own)¶
A Tap-style op that bins the input. Useful for "did this band shift
from last week?" or "did my correction blow out the bright end?":
import numpy as np
class Histogram:
"""Snapshot-style controller — captures a histogram per named tap site."""
def __init__(self, *, bins: int = 10) -> None:
self.bins = bins
self._dists: dict[str, tuple] = {}
def at(self, key: str) -> Operator:
return _HistogramTap(self._dists, key, self.bins)
def __getitem__(self, key: str):
return self._dists[key]
class _HistogramTap(Operator):
forbid_in_yaml = True
def __init__(self, store, key: str, bins: int) -> None:
self.store, self.key, self.bins = store, key, bins
def _apply(self, x):
arr = np.asarray(x, dtype=float).ravel()
if arr.size:
counts, edges = np.histogram(arr, bins=self.bins)
self.store[self.key] = (counts, edges)
return x
def get_config(self) -> dict:
return {"key": self.key, "bins": self.bins}
hist = Histogram(bins=5)
pipe = Sequential(
[
hist.at("input"),
_NamedStep("PercentileClip", factor=0.5),
hist.at("after_clip"),
]
)
pipe(np.arange(20))
print("input counts:", hist["input"][0])
print("clipped counts:", hist["after_clip"][0])
input counts: [4 4 4 4 4] clipped counts: [4 4 4 4 4]
Tradeoff. Cheap per call, expensive in aggregate if you bin every chip in a 10k-tile run. For ETL monitoring, sample 1% of tiles instead of every one.
ShapeTrace — log shape/dtype/CRS at each step (shipped)¶
Drop one between steps of a Sequential to log what's happening to the
carrier. mode="diff_only" suppresses lines that don't change.
from geotoolz import Lambda, ShapeTrace
trace = ShapeTrace(mode="diff_only")
pipe = Sequential(
[
trace,
Lambda(
lambda x: np.asarray([x, x * 2, x * 3], dtype=np.int16), name="to_array"
),
trace,
Lambda(lambda a: a.astype(np.float32), name="to_float"),
trace,
]
)
pipe(7)
shape=None dtype=None crs=None shape=(3,) dtype=int16 crs=None shape=(3,) dtype=float32 crs=None
array([ 7., 14., 21.], dtype=float32)
Spy / Hook — cross-cutting hooks (build-your-own)¶
Process-scoped hooks that fire whenever an operator of a specific type
runs, anywhere in the graph. Same idea as PyTorch forward hooks. The
library will ship a polished Spy in v0.2 — for now, the pattern uses
contextvars to keep hooks scoped (never global):
import contextvars
from contextlib import contextmanager
_active_spies: contextvars.ContextVar[tuple] = contextvars.ContextVar(
"geotoolz_spy_stack", default=()
)
class Spy:
def __init__(self) -> None:
self._hooks: dict[type, list] = {}
@classmethod
@contextmanager
def scoped(cls):
spy = cls()
token = _active_spies.set((*_active_spies.get(), spy))
try:
yield spy
finally:
_active_spies.reset(token)
def on(self, op_type: type, fn) -> None:
self._hooks.setdefault(op_type, []).append(fn)
def _fire(op, x_in, x_out):
for spy in _active_spies.get():
for fn in spy._hooks.get(type(op), ()):
fn(op, x_in, x_out)
# Wrap call sites manually for the demo (in v0.2, Operator.__call__ does this).
class _TrackedAdd(Add):
def __call__(self, x):
out = super().__call__(x)
_fire(self, x, out)
return out
pipe = Sequential([_TrackedAdd(1), _TrackedAdd(10), _TrackedAdd(100)])
with Spy.scoped() as spy:
spy.on(_TrackedAdd, lambda op, xi, xo: print(f" {op}: {xi} → {xo}"))
pipe(0)
print()
print("outside the scope — silent:")
pipe(0)
_TrackedAdd(n=1): 0 → 1 _TrackedAdd(n=10): 1 → 11 _TrackedAdd(n=100): 11 → 111 outside the scope — silent:
111
Tradeoff. Scoping is the safe default. A naive global registry —
hooks fire across modules and tests — is a footgun. The contextvars
implementation here costs an empty-tuple iteration when no scope is
active.
Diff — compare against a reference (build-your-own)¶
The pytest of operator pipelines. Drop inline to catch silent regressions during refactors. Library will ship in v0.2:
class DiffError(Exception):
pass
class Diff(Operator):
"""Compare input against a stored reference; raise on drift beyond atol."""
forbid_in_yaml = True
def __init__(self, *, reference, atol: float = 1e-6) -> None:
self.reference = np.asarray(reference)
self.atol = atol
def _apply(self, x):
arr = np.asarray(x)
diff = float(np.max(np.abs(arr - self.reference)))
if diff > self.atol:
raise DiffError(f"max abs diff {diff:.4g} exceeds atol={self.atol}")
return x
def get_config(self) -> dict:
return {"atol": self.atol, "ref_shape": self.reference.shape}
# Bless once when you trust the output:
golden = np.array([1, 11, 111])
# Then catches drift on subsequent runs:
arr_in = np.array([0, 10, 110])
pipe = Sequential(
[
Lambda(lambda a: a + 1, name="add1"),
Diff(reference=golden, atol=1e-6),
]
)
print("matches golden:", pipe(arr_in))
try:
pipe(np.array([0, 10, 111])) # off by one in last element
except DiffError as e:
print("caught regression:", e)
matches golden: [ 1 11 111] caught regression: max abs diff 1 exceeds atol=1e-06
Tradeoff. Reference files drift over time as legitimate
improvements ship. Pair with version pinning (Diff(reference=v3_ref))
per pipeline version — don't overwrite a blessed reference in place.
2. Control flow¶
Conditional execution, fallbacks, retries — same composition primitives, more interesting graphs.
Branch — binary predicate (shipped)¶
Apply if_true when predicate(x) is truthy, else if_false. Default
if_false=Identity().
from geotoolz import Branch, Identity
guarded = Branch(
predicate=lambda x: x > 0,
if_true=Add(100),
if_false=Identity(), # default — pass-through for non-positive
)
print("guarded(5) =", guarded(5))
print("guarded(-5)=", guarded(-5))
guarded(5) = 105 guarded(-5)= -5
Switch — multi-way dispatch (shipped)¶
key(x) selects from cases; falls through to default (default
Identity()) on misses.
from geotoolz import Switch
router = Switch(
key=lambda x: "even" if x % 2 == 0 else "odd",
cases={
"even": Add(1),
"odd": Sequential([Add(100), Add(100)]),
},
)
print("router(4) =", router(4))
print("router(3) =", router(3))
router(4) = 5 router(3) = 203
Try / Fallback — exception cascade (build-your-own)¶
Robust to upstream flakiness — try primary, fall back to fallback if it raises a specified exception. Library will ship in v0.2:
class Try(Operator):
"""Run ``primary``; on listed exception types, run ``fallback`` instead."""
forbid_in_yaml = True
def __init__(self, *, primary: Operator, fallback: Operator, on: tuple) -> None:
if not isinstance(on, tuple):
raise TypeError("'on' must be a tuple of exception types")
self.primary, self.fallback, self.on = primary, fallback, on
def _apply(self, x):
try:
return self.primary(x)
except self.on as e:
print(f" fallback fired: {type(e).__name__}: {e}")
return self.fallback(x)
def get_config(self) -> dict:
return {
"primary": type(self.primary).__name__,
"fallback": type(self.fallback).__name__,
"on": [t.__name__ for t in self.on],
}
class Boom(Operator):
def _apply(self, x):
raise ConnectionError(f"network unreachable on input {x}")
robust = Try(primary=Boom(), fallback=Add(0), on=(ConnectionError,))
print("robust(7) =", robust(7))
fallback fired: ConnectionError: network unreachable on input 7 robust(7) = 7
Tradeoff. Silent fallback can hide deteriorating production conditions. Pair with metrics — log every fallback as a counter, alert when the rate exceeds a threshold.
Coalesce — first non-empty across sources (build-your-own)¶
"S2 first, fall back to Landsat if S2 too cloudy, then MODIS." Cascades
on a quality predicate (output is bad), distinct from Try which
cascades on an exception.
class Coalesce(Operator):
"""First op whose output passes ``is_ok`` wins."""
forbid_in_yaml = True
def __init__(self, sources: list, *, is_ok) -> None:
self.sources = sources
self.is_ok = is_ok
def _apply(self, x):
for op in self.sources:
out = op(x)
if self.is_ok(out):
return out
raise RuntimeError("no source produced acceptable output")
def get_config(self) -> dict:
return {"n_sources": len(self.sources)}
def is_positive(out):
return out > 0
coalesced = Coalesce(
[
Add(-100), # produces negative — rejected
Add(-10), # still negative — rejected
Add(50), # positive — wins
],
is_ok=is_positive,
)
print("coalesced(5) =", coalesced(5))
coalesced(5) = 55
Retry — backoff loop (build-your-own)¶
Wrap an op with retry + exponential backoff. Most useful for ModelOp
hitting a remote endpoint:
class Retry(Operator):
forbid_in_yaml = True
def __init__(
self, op: Operator, *, attempts: int = 3, base_delay: float = 0.0, on: tuple
) -> None:
self.op, self.attempts, self.base_delay, self.on = op, attempts, base_delay, on
def _apply(self, x):
last_exc = None
for attempt in range(self.attempts):
try:
return self.op(x)
except self.on as e:
last_exc = e
if self.base_delay:
time.sleep(self.base_delay * (2**attempt))
raise last_exc
def get_config(self) -> dict:
return {
"attempts": self.attempts,
"base_delay": self.base_delay,
"on": [t.__name__ for t in self.on],
}
_attempts = {"count": 0}
class FlakeyOp(Operator):
def _apply(self, x):
_attempts["count"] += 1
if _attempts["count"] < 3:
raise TimeoutError(f"transient failure (attempt {_attempts['count']})")
return x + 1000
robust_call = Retry(FlakeyOp(), attempts=5, on=(TimeoutError,))
print("retrying...")
out = robust_call(7)
print(f"succeeded after {_attempts['count']} attempts, out = {out}")
retrying... succeeded after 3 attempts, out = 1007
Tradeoff. Retries hide latency from upstream callers — a tile-server
request that nominally takes 200ms can spike to 7s after two retries.
For latency-sensitive paths, prefer Try with a fast fallback.
3. Composition¶
Building blocks for graphs that aren't pure linear chains.
Fanout — one input, many outputs (shipped)¶
Sugar over a single-input Graph. Computes each branch on the same
input, returns a dict keyed by branch name.
from geotoolz import Fanout
products = Fanout(
{
"doubled": Lambda(lambda x: x * 2, name="double"),
"squared": Lambda(lambda x: x * x, name="square"),
"labelled": _NamedStep("classify", factor=10),
}
)
print(products(7))
{'doubled': 14, 'squared': 49, 'labelled': 70}
ApplyToBands — split-apply-combine over an axis (build-your-own)¶
Apply the inner op independently to each slice along an axis, recombine. Library will ship in v0.2 with a proper axis convention. Minimal version using only v0.1 primitives:
class ApplyToBands(Operator):
"""Apply ``inner`` to each slice along ``axis``, stack results."""
def __init__(self, inner: Operator, *, axis: int = 0) -> None:
self.inner = inner
self.axis = axis
def _apply(self, arr):
arr = np.asarray(arr)
slices = [
self.inner(np.take(arr, i, axis=self.axis))
for i in range(arr.shape[self.axis])
]
return np.stack(slices, axis=self.axis)
def get_config(self) -> dict:
return {
"inner": type(self.inner).__name__,
"inner_config": self.inner.get_config(),
"axis": self.axis,
}
# stand-in: scale each "band" by a factor of 10
op = ApplyToBands(Lambda(lambda x: x * 10, name="scale"), axis=0)
arr = np.arange(12).reshape(3, 4)
print("input :")
print(arr)
print("per-band scaled:")
print(op(arr))
input : [[ 0 1 2 3] [ 4 5 6 7] [ 8 9 10 11]] per-band scaled: [[ 0 10 20 30] [ 40 50 60 70] [ 80 90 100 110]]
Tradeoff. Naive Python-level loop — fine for ~10 bands, painful for
hyperspectral (~200). For hyperspectral, the inner op should ideally
vectorise across the band axis directly; ApplyToBands is the fallback
when it can't.
Cache / Memoize — hash input + config (build-your-own)¶
Saves hours during iterative analysis where you keep re-running the
same expensive prefix. Cache key is (input_hash, op.config_hash):
import hashlib
import json
class Cache(Operator):
"""Memoise ``inner`` on (input_hash, config_hash). In-memory backend."""
def __init__(self, inner: Operator) -> None:
self.inner = inner
self._store: dict[str, object] = {}
self._hits = 0
self._misses = 0
def _key(self, x) -> str:
# Cheap stand-in: repr for the input + canonical config
x_h = hashlib.sha256(repr(x).encode()).hexdigest()[:12]
cfg_h = hashlib.sha256(
json.dumps(self.inner.get_config(), sort_keys=True).encode()
).hexdigest()[:12]
return f"{x_h}:{cfg_h}"
def _apply(self, x):
k = self._key(x)
if k in self._store:
self._hits += 1
return self._store[k]
self._misses += 1
out = self.inner(x)
self._store[k] = out
return out
def get_config(self) -> dict:
return {"inner": type(self.inner).__name__}
cached = Cache(Sequential([Add(1), Add(10)]))
for _ in range(5):
cached(0)
print(f"5 calls → {cached._hits} hits, {cached._misses} miss")
print("result :", cached(0))
5 calls → 4 hits, 1 miss result : 11
Tradeoff. In-memory cache is fast but unbounded — wrap with an LRU
for long-running services. A disk backend is restart-friendly but you
need to garbage-collect old entries. Cache must opt-in (Cache(...)
wraps explicitly), never silent — if Cache ever became a default,
debugging "why does my pipeline produce stale output?" would be a
nightmare.
4. Stateful / ML¶
Operators that hold state across calls or interact with the broader runtime.
Mode — scoped train / eval switching (build-your-own)¶
The lesson from PyTorch is that implicit, instance-level, sticky
train() / eval() is a footgun. geotoolz will take the scoped,
explicit path in v0.2 — mode is a context manager on a Sequential,
not a sticky attribute on the graph. Pattern:
_current_mode: contextvars.ContextVar = contextvars.ContextVar(
"geotoolz_pipeline_mode", default=None
)
@contextmanager
def pipeline_mode(name: str):
"""Enter a named mode for the duration of the with block."""
if name not in ("train", "eval"):
raise ValueError(f"mode must be 'train' or 'eval', got {name!r}")
token = _current_mode.set(name)
try:
yield name
finally:
_current_mode.reset(token)
class ModeGated(Operator):
"""Wraps an op that only fires under a specific mode."""
forbid_in_yaml = True
def __init__(self, inner: Operator, *, mode: str) -> None:
self.inner, self.required_mode = inner, mode
def _apply(self, x):
if _current_mode.get() == self.required_mode:
return self.inner(x)
return x
def get_config(self) -> dict:
return {"mode": self.required_mode, "inner": type(self.inner).__name__}
# Augmentation only fires in train mode; deterministic preprocess always runs.
pipe = Sequential(
[
Add(1), # preprocess (always)
ModeGated(Add(1000), mode="train"), # augmentation (train only)
]
)
with pipeline_mode("train"):
print("train mode:", pipe(0))
with pipeline_mode("eval"):
print("eval mode :", pipe(0))
# Outside any scope: mode is None, so ModeGated is silent
print("no mode :", pipe(0))
train mode: 1001 eval mode : 1 no mode : 1
Tradeoff. Slightly more verbose than pipe.train() /
pipe.eval() — the verbosity is the feature. Every train-mode invocation
is grep-able in source. Code that forgets to enter the scope fails
loudly (or silently does the deterministic path) instead of producing
augmented "validation" tensors.
Provenance / Watermark — stamp lineage into outputs (build-your-own)¶
Stamp pipeline metadata into the output so consumers years later can trace it back to the exact recipe. Minimal version:
class Provenance(Operator):
"""Wraps a pipeline; attaches a metadata dict on its way out.
Real pipelines write this to the output COG's tags via georeader.
"""
forbid_in_yaml = True
def __init__(self, inner: Operator) -> None:
self.inner = inner
def _apply(self, x):
out = self.inner(x)
cfg = self.inner.get_config()
cfg_canon = json.dumps(cfg, sort_keys=True)
provenance = {
"pipeline_hash": hashlib.sha256(cfg_canon.encode()).hexdigest()[:16],
"operators": _op_names(self.inner),
"geotoolz_version": "0.0.1",
}
# In real code: out.metadata["provenance"] = provenance
return {"data": out, "_provenance": provenance}
def get_config(self) -> dict:
return {"inner": type(self.inner).__name__}
def _op_names(op: Operator) -> list:
if isinstance(op, Sequential):
return [type(o).__name__ for o in op.operators]
return [type(op).__name__]
tracked = Provenance(Sequential([Add(1), Add(10)]))
result = tracked(0)
print("data :", result["data"])
print("provenance :")
for k, v in result["_provenance"].items():
print(f" {k}: {v}")
data : 11 provenance : pipeline_hash: 70ea51d87c78d5eb operators: ['Add', 'Add'] geotoolz_version: 0.0.1
Tradeoff. Provenance metadata grows with every wrap — a
deeply-nested Graph produces a large lineage record. Cap at a
reasonable depth, or store the full graph hash plus a short
operator-name summary rather than every config. Don't let provenance
bloat overshadow the pixel data.
5. Validation / QC (the assertion family)¶
The other identity-with-side-effect family — pass-through ops that check invariants rather than observe state.
AssertX — assertions as operators (build-your-own)¶
Drop anywhere in a pipeline to enforce a contract. The library will
ship a family of these in geotoolz.qc in v0.2:
class QCError(Exception):
pass
class AssertValueRange(Operator):
"""Pass-through; raise (or warn) on values outside [min_val, max_val]."""
def __init__(
self, *, min_val: float, max_val: float, on_fail: str = "raise"
) -> None:
if on_fail not in ("raise", "warn"):
raise ValueError("on_fail must be 'raise' or 'warn'")
self.min, self.max, self.on_fail = min_val, max_val, on_fail
def _apply(self, x):
arr = np.asarray(x)
lo, hi = float(np.nanmin(arr)), float(np.nanmax(arr))
if lo < self.min or hi > self.max:
msg = f"range [{lo:.3g}, {hi:.3g}] outside [{self.min}, {self.max}]"
if self.on_fail == "raise":
raise QCError(msg)
print("WARNING:", msg)
return x
def get_config(self) -> dict:
return {"min_val": self.min, "max_val": self.max, "on_fail": self.on_fail}
qc = AssertValueRange(min_val=0, max_val=100, on_fail="warn")
print("good input (passes silently):")
qc(np.array([5, 25, 90]))
print("(no warning, value passed through)")
print()
print("bad input (warning fires):")
qc(np.array([5, 25, 150]))
print()
print("strict mode raises:")
strict = AssertValueRange(min_val=0, max_val=100)
try:
strict(np.array([5, 25, 150]))
except QCError as e:
print("caught:", e)
good input (passes silently): (no warning, value passed through) bad input (warning fires): WARNING: range [5, 150] outside [0, 100] strict mode raises: caught: range [5, 150] outside [0, 100]
The bigger pattern. Every QC check is a research-time "I bet this won't break" turned into a permanent runtime guard. CI gets a pipeline-correctness suite for free, and pipelines self-document their preconditions.
Quarantine — non-raising QC (build-your-own)¶
If a check fails, route the bad input to a sidecar, return a sentinel, let the pipeline continue. The "log_and_continue" of QC:
class Quarantine(Operator):
"""On check failure: log + return sentinel; on success: pass through."""
forbid_in_yaml = True
def __init__(self, check: Operator, *, sentinel, on_quarantine=None) -> None:
self.check = check
self.sentinel = sentinel
self.on_quarantine = on_quarantine or (lambda x, err: None)
def _apply(self, x):
try:
self.check(x)
except Exception as e: # broad — Quarantine catches by design
self.on_quarantine(x, e)
return self.sentinel
return x
def get_config(self) -> dict:
return {"check": type(self.check).__name__}
log: list = []
pipe = Sequential(
[
Quarantine(
AssertValueRange(min_val=0, max_val=100),
sentinel=np.array([-1]), # marker for "quarantined"
on_quarantine=lambda x, e: log.append(f"quarantined: {e}"),
),
]
)
print("good:", pipe(np.array([5, 25])))
print("bad: ", pipe(np.array([5, 999])))
print("log :", log)
good: [ 5 25] bad: [-1] log : ['quarantined: range [5, 999] outside [0, 100]']
Tradeoff. Quarantine is a hedge — it trades immediate failure for
delayed debugging. Don't quarantine errors that indicate genuine bugs
(wrong CRS, wrong band order); those should raise. Quarantine is for
expected edge cases in the data (corrupt scenes, partial downloads,
sensor glitches).
6. Small but load-bearing building blocks¶
Boring on their own, indispensable in combination.
Identity / Const / Lambda / Sink (all shipped)¶
Recap: Identity for explicit no-op, Const for fixed-value, Lambda
for inline callables, Sink for composable terminal writes.
from geotoolz import Const, Sink
# Branch with explicit Identity in if_false — self-documenting no-op
opt_clean = Branch(
predicate=lambda x: x > 0,
if_true=Sequential([Add(100), Add(-50)]),
if_false=Identity(),
)
print("opt_clean(10) =", opt_clean(10))
print("opt_clean(-5) =", opt_clean(-5))
# Sink: write side effect + pass through
written: list = []
checkpoint_pipe = Sequential(
[
Add(1),
Sink(written.append, name="checkpoint_after_first"),
Add(10),
]
)
print("checkpoint_pipe(0) =", checkpoint_pipe(0))
print("checkpoint stored :", written)
# Const: fixed value, useful for synthetic sources / golden tests
golden_input = Const(np.arange(5))
synth_test = Sequential(
[
golden_input, # ignores actual input
Lambda(lambda a: a.sum(), name="sum"),
]
)
print("synth test sum:", synth_test("real_input_ignored"))
opt_clean(10) = 60 opt_clean(-5) = -5 checkpoint_pipe(0) = 11 checkpoint stored : [1] synth test sum: 10
Subsample — stride decimation (build-your-own)¶
Random or stride-based pixel subsampling for fast visualisation off a
full-resolution carrier. Two shapes — Tap-style (sample-and-call)
and standalone (return decimated GeoTensor):
class Subsample(Operator):
"""Stride-decimate the last two axes. Standalone transform."""
def __init__(self, *, stride: int = 10) -> None:
if stride <= 0:
raise ValueError("stride must be positive")
self.stride = stride
def _apply(self, arr):
arr = np.asarray(arr)
return arr[..., :: self.stride, :: self.stride]
def get_config(self) -> dict:
return {"stride": self.stride}
big = np.arange(100).reshape(10, 10)
small = Subsample(stride=3)(big)
print("input shape :", big.shape)
print("decimated shape:", small.shape)
print(small)
input shape : (10, 10) decimated shape: (4, 4) [[ 0 3 6 9] [30 33 36 39] [60 63 66 69] [90 93 96 99]]
Tradeoff. Random subsampling biases summary statistics for non-uniform fields (e.g., concentrated NDVI hotspots in mostly-bare scenes). For fair visualisation, use stride; for fair statistics, use weighted random sampling.
7. Two design rules¶
Two small disciplines keep the surface tractable as the trick library grows.
Honest naming¶
Don't disguise an assertion as a transform. Don't disguise a side
effect as a computation. Users should be able to scan a Sequential
and immediately see which steps mutate, which observe, which
guard, and which control flow.
Sequential([
_NamedStep("MaskClouds"), # transform
AssertValueRange(min_val=0, max_val=10_000, # guard
on_fail="warn"),
Tap(log_stats), # observe
Branch(predicate=..., if_true=..., if_false=Identity()), # control flow
_NamedStep("NDVI"), # transform
])
Suggested naming:
| Family | Prefix |
|---|---|
| Assertions | qc.Assert* |
| Observers | core.Tap, core.Snapshot, core.Histogram, core.Profile, core.ShapeTrace |
| Control flow | core.Branch, core.Switch, core.Try, core.Coalesce, core.Retry |
| Transforms | everything else |
Round-trip discipline¶
Operators that hold closures (Tap, Lambda, Branch, Switch,
Sink, ModelOp, the build-your-own ones in this notebook with
forbid_in_yaml = True) cannot round-trip to YAML faithfully. Their
get_config() is a debug repr.
Future YAML loaders should:
- Refuse to instantiate any operator with
forbid_in_yaml = True - Refuse to dump a graph containing one
Production pipelines never contain closures. This keeps "operator graph as audit artifact" honest — every operator in a regulatory artifact has a stable config, every config round-trips, every artifact reruns to the same answer.
8. The shape¶
Once these primitives exist, most user "Operator subclass" needs go away.
| Instead of writing… | Compose… |
|---|---|
LoggedNDVI |
Tap(log) \| NDVI(...) |
OptionalCorrection |
Branch(predicate, if_true=Correction(), if_false=Identity()) |
RobustModelOp |
Retry(ModelOp(...), attempts=3) \| Try(..., fallback=...) |
BandwiseSpeckle |
ApplyToBands(LeeSpeckle(...), axis=0) |
CachedPipeline |
Cache(my_pipeline) |
TimedPipeline |
Profile().wrap(my_pipeline) |
MultiOutputModel |
Fanout({"a": op_a, "b": op_b}) |
WithLogging |
Tap(log_stats) |
WithProvenance |
Provenance(my_pipeline) |
S2OrLandsatNDVI |
Switch(key=lambda gt: gt.metadata["sensor"], cases={...}) |
The library's primitive set does the work; users compose. Adding a new trick adds one Operator, not a family of variants.
Where next¶
- Concepts — the model behind the primitives.
- Composition core walkthrough — primitives demonstrated against scalars.
- Deployment shapes — same primitives in 13 deployment patterns (notebook, ETL, FastAPI, tile server, orchestrator, regulatory artifact, ...).
- Core API reference — operator-by-operator docs.