Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

07 — Deployment shapes (13 patterns)

A gallery of patterns showing where the geotoolz operator algebra fits — across notebook exploration, ETL, ML, web APIs, and operational tooling.

The unifying claim: the same operator graph runs across all of these. What changes between cases is who drives — a notebook user, an HTTP request, a scheduler, a sampler, a tile renderer, or another graph.

About the dummy code. Each section preserves the shape of the deployment pattern (which class is loaded where, who calls pipe(gt), how the result flows out) but uses stand-in Operator subclasses for domain ops we haven’t shipped yet, and skips actually starting servers / orchestrators. Treat each section as a runnable structural sketch.

When real domain operators (gz.cloud.MaskClouds, gz.indices.NDVI, ...) land in v0.2+, swap the dummy stand-ins for them and the rest of the code stands as-is.

The thirteen sections below correspond to the thirteen recipes; your rendered docs will pick them up as a sidebar TOC automatically.

Shared setup — stand-in domain operators & fake catalog

Reused across most cases. Each fake op is a thin Operator subclass that takes the shape of the equivalent design-doc op (MaskClouds, NDVI, ...) but does trivial work over numbers.

import hashlib

import numpy as np
from geotoolz import (
    Graph,
    Identity,
    Input,
    Lambda,
    ModelOp,
    Operator,
    Sequential,
    Tap,
)


# Stand-in domain operators ------------------------------------------------
class _FakeMaskClouds(Operator):
    """Stand-in for ``gz.cloud.MaskClouds``."""

    def __init__(self, *, qa_band: int = -1, bits=(10, 11)) -> None:
        self.qa_band = qa_band
        self.bits = tuple(bits)

    def _apply(self, x):
        # Dummy: zero-out 'cloudy' pixels (negative values)
        arr = np.asarray(x).copy()
        arr[arr < 0] = 0
        return arr

    def get_config(self) -> dict:
        return {"qa_band": self.qa_band, "bits": list(self.bits)}


class _FakeNDVI(Operator):
    def __init__(self, *, red_idx: int = 2, nir_idx: int = 3) -> None:
        self.red_idx, self.nir_idx = red_idx, nir_idx

    def _apply(self, x):
        # Dummy: arr → arr / 10
        return np.asarray(x) / 10.0

    def get_config(self) -> dict:
        return {"red_idx": self.red_idx, "nir_idx": self.nir_idx}


class _FakeToBOA(Operator):
    def __init__(self, *, sun_zenith_band: int = -2) -> None:
        self.sun_zenith_band = sun_zenith_band

    def _apply(self, x):
        return np.asarray(x) * 1.02

    def get_config(self) -> dict:
        return {"sun_zenith_band": self.sun_zenith_band}


class _FakeDarkObject(Operator):
    def __init__(self, *, percentile: int = 1) -> None:
        self.percentile = percentile

    def _apply(self, x):
        arr = np.asarray(x)
        return arr - np.percentile(arr, self.percentile)

    def get_config(self) -> dict:
        return {"percentile": self.percentile}


# Stand-in catalog / reader -------------------------------------------------
class FakeRow:
    def __init__(self, scene_id: str, payload):
        self.id = scene_id
        self.payload = payload


class FakeCatalog:
    def __init__(self, rows):
        self._rows = list(rows)

    def iter_rows(self):
        yield from self._rows

    def __len__(self):
        return len(self._rows)


def make_demo_catalog(n: int = 5) -> FakeCatalog:
    rng = np.random.default_rng(0)
    rows = [
        FakeRow(f"scene_{i:03d}", rng.integers(-2, 100, size=(4,))) for i in range(n)
    ]
    return FakeCatalog(rows)


def fake_save_cog(arr, dest: str) -> str:
    """No-op stand-in for georeader.save_cog. Returns the path it pretended to write."""
    return dest


print("Shared fixtures ready — fake ops, FakeCatalog, fake_save_cog.")
Shared fixtures ready — fake ops, FakeCatalog, fake_save_cog.

1. Notebook exploration

The setting. The first place a new operator earns its keep. A researcher debugging why does my output look like that? needs to see intermediate state without exploding the pipeline into ten named variables. Tap is the under-rated trick — an identity operator with a side effect lets users live inside the chain instead of breaking it apart for inspection.

Driver: human at a notebook keyboard.

# A typical exploration pipeline with inline observation
seen = []

viz = Sequential(
    [
        _FakeMaskClouds(qa_band=-1, bits=[10, 11]),
        Tap(lambda gt: seen.append(("after_mask", float(np.nanmean(gt))))),
        _FakeDarkObject(percentile=2),
        Tap(lambda gt: seen.append(("after_correction", float(np.nanmean(gt))))),
        _FakeNDVI(red_idx=2, nir_idx=3),
    ]
)

scene = np.array([10, -5, 50, 100])
result = viz(scene)

print("final result:", result)
print()
print("intermediate stats logged inline:")
for stage, mean in seen:
    print(f"  {stage:18s}  mean = {mean:.3f}")
final result: [ 0.94 -0.06  4.94  9.94]

intermediate stats logged inline:
  after_mask          mean = 40.000
  after_correction    mean = 39.400

Tradeoff. The fn in Tap is a closure, so Tap is not config-round-trippable — its get_config() is a debug repr. That’s the correct choice for a debug primitive. Production pipelines should not contain Tap. A forbid_in_yaml=True flag on the class prevents accidental escape from the notebook.

2. ETL pipelines

The setting. A one-off notebook script becomes a scheduled job that runs nightly across thousands of scenes. The two failure modes the library is meant to remove: rewriting the pipeline to fit the orchestrator, and divergence between the research notebook version and the production version. ETL here means: catalog drives the iteration; same operator graph applies to every row; results land in storage.

Driver: scheduler (cron / Airflow / etc.) → CatalogPipelinereader → op → writer per row.

# Same operator graph used in the notebook above
etl = Sequential(
    [
        _FakeMaskClouds(qa_band=-1, bits=[10, 11]),
        _FakeDarkObject(percentile=1),
        _FakeToBOA(sun_zenith_band=-2),
        _FakeNDVI(red_idx=2, nir_idx=3),
    ]
)


class CatalogPipeline:
    """Stand-in for the v0.2+ ``gz.catalog_ops.CatalogPipeline``."""

    def __init__(self, catalog, op, *, on_row_error: str = "raise") -> None:
        self.catalog, self.op = catalog, op
        if on_row_error not in ("raise", "log_and_continue"):
            raise ValueError("on_row_error must be 'raise' or 'log_and_continue'")
        self.on_row_error = on_row_error
        self.errors: list = []

    def run(self) -> list:
        results = []
        for row in self.catalog.iter_rows():
            try:
                # In real code: gt = reader.read_geoslice(row.to_geoslice())
                gt = row.payload
                out = self.op(gt)
                results.append((row.id, out))
            except Exception as e:
                if self.on_row_error == "raise":
                    raise
                self.errors.append((row.id, str(e)))
                print(f"  [skip] {row.id}: {e}")
        return results


catalog = make_demo_catalog(n=5)
runner = CatalogPipeline(catalog, etl, on_row_error="log_and_continue")
results = runner.run()

print(f"\nprocessed {len(results)} rows from {len(catalog)} candidates")
for row_id, out in results:
    print(f"  {row_id}  →  {out}")
print(f"errors: {len(runner.errors)}")

processed 5 rows from 5 candidates
  scene_000  →  [ 5.9415  3.6975  2.4735 -0.0765]
  scene_001  →  [ 2.95188  0.19788  0.50388 -0.00612]
  scene_002  →  [-0.14994  6.48006  4.84806  7.60206]
  scene_003  →  [-0.0306  0.9894  4.8654  2.3154]
  scene_004  →  [ 0.91188 -0.00612  0.19788  4.07388]
errors: 0

Tradeoff. Process-pool parallelism crosses pickle boundaries — every operator and every reader must be pickleable. Closures, lambdas, and non-default-pickled cloud credentials break here, often silently. The on_row_error="log_and_continue" default is a hedge against the “one bad scene crashes the 10k-scene job” anti-pattern; the cost is that quiet failures need monitoring (count of skipped rows per run, failure rate alerts).

3. ML training and inference

The setting. Train/serve skew is a perennial source of silent ML bugs — training preprocessing diverges from inference preprocessing over time, the model performs well in evaluation and badly in production, and nobody notices until a stakeholder spots the regression months later. The library’s claim: the same preprocess object lives in both paths, by construction.

Driver: training loop (DataLoader) at train time; tile sampler + ApplyToChips at inference time. Both use the same preprocess.

# === Shared preprocessing — defined ONCE ===
preprocess = Sequential(
    [
        _FakeMaskClouds(),
        _FakeDarkObject(percentile=2),
        _FakeToBOA(),
    ]
)


# === Training augmentation — train-only ===
class _RandomFlip(Operator):
    def __init__(self, *, p: float = 0.5, rng=None) -> None:
        self.p = p
        self.rng = rng or np.random.default_rng()

    def _apply(self, x):
        if self.rng.random() < self.p:
            return np.asarray(x)[..., ::-1]
        return x

    def get_config(self) -> dict:
        return {"p": self.p}


augment = Sequential([_RandomFlip(p=0.5, rng=np.random.default_rng(42))])


# === Training-time dataloader (in real code this would be torch.IterableDataset) ===
def training_batches(catalog, *, n_batches: int = 3):
    rng = np.random.default_rng(0)
    for _ in range(n_batches):
        rows = list(catalog.iter_rows())
        row = rows[rng.integers(0, len(rows))]
        gt = row.payload
        x = augment(preprocess(gt))  # both run at train time
        yield x


# === Inference-time pipeline (same preprocess, no augment) ===
class _FakeUNet:
    """Fake model: returns input shifted."""

    def __call__(self, arr):
        return arr + 1000


infer = Sequential(
    [
        preprocess,
        ModelOp(_FakeUNet()),
    ]
)


catalog = make_demo_catalog(n=10)

print("Training batches (with augmentation):")
for i, batch in enumerate(training_batches(catalog, n_batches=3)):
    print(f"  batch {i}: {batch}")

print("\nInference (same preprocess + model, no augmentation):")
for row in list(catalog.iter_rows())[:3]:
    print(f"  {row.id}: {infer(row.payload)}")
Training batches (with augmentation):
  batch 0: [ 6.7116 87.2916 -0.4284 53.6316]
  batch 1: [-2.2644 51.7956 83.4156 35.4756]
  batch 2: [24.9288 81.0288 65.7288 -1.5912]

Inference (same preprocess + model, no augmentation):
  scene_000: [1058.65 1036.21 1023.97  998.47]
  scene_001: [1029.4576 1001.9176 1004.9776  999.8776]
  scene_002: [ 997.0012 1063.3012 1046.9812 1074.5212]

Tradeoff. Closures inside augment Operators (e.g. a captured RNG) break DataLoader workers — each worker gets the same seed and produces correlated batches. The fix is per-Operator seed= arguments and a worker_init_fn that re-seeds. Chip-stitching overlap (stride < chip_size) costs ~30% more reads for smoother boundaries; for fast inference set stride == chip_size and accept seam artefacts.

4. LitServe inference API

The setting. The model ships as an HTTP service, a long-lived process holding the GPU and serving request/response over JSON. LitServe is a good fit because its decode/predict/encode split lines up exactly with read → operator-graph → serialise.

Driver: HTTP request → LitAPI lifecycle (decode_requestpredictencode_response). The pipeline is loaded once in setup.

The cell below shows the structure of a LitAPI handler without actually starting LitServe (no GPU in this environment). The decode_request / predict / encode_response methods are exercised on a stand-in request.

import json as _json


class MethanePipelineAPI:
    """Sketch of a litserve.LitAPI subclass — not actually launched."""

    def setup(self, device):
        """Load the pipeline once at process startup."""
        self.pipeline = Sequential(
            [
                _FakeMaskClouds(qa_band=-1, bits=[10, 11]),
                _FakeDarkObject(percentile=1),
                _FakeNDVI(red_idx=2, nir_idx=3),
            ]
        )
        cfg_canon = _json.dumps(self.pipeline.get_config(), sort_keys=True)
        self.config_hash = hashlib.sha256(cfg_canon.encode()).hexdigest()[:16]

    def decode_request(self, req: dict):
        # In real code: gt = georeader.read_from_url(req["url"], bounds=req["bbox"])
        return np.asarray(req["payload"])

    def predict(self, gt):
        return self.pipeline(gt)

    def encode_response(self, gt_out) -> dict:
        return {
            "result": gt_out.tolist(),
            "stats": {
                "mean": float(np.nanmean(gt_out)),
                "min": float(np.nanmin(gt_out)),
                "max": float(np.nanmax(gt_out)),
            },
            "config_hash": self.config_hash,
        }


api = MethanePipelineAPI()
api.setup(device="cpu")
fake_request = {
    "url": "s3://bucket/scene_001.tif",
    "bbox": [-1, -1, 1, 1],
    "payload": [10, -5, 50, 100],
}
print("Request payload:", fake_request["payload"])
print()
decoded = api.decode_request(fake_request)
predicted = api.predict(decoded)
response = api.encode_response(predicted)

print("Response:")
print(_json.dumps(response, indent=2, default=float))
Request payload: [10, -5, 50, 100]

Response:
{
  "result": [
    0.97,
    -0.03,
    4.970000000000001,
    9.97
  ],
  "stats": {
    "mean": 3.97,
    "min": -0.03,
    "max": 9.97
  },
  "config_hash": "990c9d4a5c66301a"
}

Tradeoff. Including config_hash in every response is cheap insurance — clients can detect “the model changed under me” without out-of-band coordination. Cold start is dominated by setup(): keep heavy state (model weights, calibration tables, target spectra) in long-lived attributes, never construct them per predict(). Batch dynamically (LitServe’s max_batch_size) only if the underlying model genuinely benefits — geospatial operators often don’t, since the chip-size dimension already saturates the GPU.

5. Backend API (FastAPI, multi-pipeline)

The setting. A platform team running many models wants one service that hosts them all. Pipeline registry pattern: load every pipeline at startup, route based on URL path. Trades GPU pinning for flexibility — an operations team can roll out a new pipeline by adding a YAML entry and restarting.

Driver: HTTP request → router → registered pipeline. The /pipelines/{name} endpoint matters more than people think: get_config() lets the API describe its own behaviour to clients without out-of-band docs.

Stand-in: a PipelineService class that mirrors a FastAPI router without actually starting Uvicorn. The HTTP plumbing is one decorator-call layer above what’s shown here.

class PipelineService:
    """Routes requests to named pipelines. Returns config + result."""

    def __init__(self, pipelines: dict) -> None:
        self.pipelines = pipelines
        self.hashes = {
            name: hashlib.sha256(
                _json.dumps(p.get_config(), sort_keys=True).encode()
            ).hexdigest()[:12]
            for name, p in pipelines.items()
        }

    def process(self, name: str, payload) -> dict:
        """@app.post('/process/{name}') equivalent."""
        if name not in self.pipelines:
            raise KeyError(f"unknown pipeline: {name}")
        gt = np.asarray(payload)
        out = self.pipelines[name](gt)
        out_uri = fake_save_cog(out, f"s3://outputs/{name}_demo.tif")
        return {"out_uri": out_uri, "config_hash": self.hashes[name]}

    def describe(self, name: str) -> dict:
        """@app.get('/pipelines/{name}') equivalent."""
        return self.pipelines[name].get_config()

    def list_pipelines(self) -> dict:
        """@app.get('/pipelines') equivalent."""
        return dict(self.hashes)


# Three named pipelines registered at startup
service = PipelineService(
    {
        "methane_v3": Sequential([_FakeMaskClouds(), _FakeDarkObject(percentile=1)]),
        "methane_v4": Sequential([_FakeMaskClouds(), _FakeDarkObject(percentile=2)]),
        "ndvi_etl": Sequential([_FakeMaskClouds(), _FakeNDVI()]),
    }
)

print("Pipeline registry:")
for name, h in service.list_pipelines().items():
    print(f"  {name:12s}  hash={h}")

print("\n/process/methane_v3 →", service.process("methane_v3", [10, 20, 30]))

print("\n/pipelines/ndvi_etl describe:")
print(_json.dumps(service.describe("ndvi_etl"), indent=2))
Pipeline registry:
  methane_v3    hash=f77c6471b06d
  methane_v4    hash=86b2a4c0d6e6
  ndvi_etl      hash=105afc6cf05f

/process/methane_v3 → {'out_uri': 's3://outputs/methane_v3_demo.tif', 'config_hash': 'f77c6471b06d'}

/pipelines/ndvi_etl describe:
{
  "operators": [
    {
      "class": "_FakeMaskClouds",
      "config": {
        "qa_band": -1,
        "bits": [
          10,
          11
        ]
      }
    },
    {
      "class": "_FakeNDVI",
      "config": {
        "red_idx": 2,
        "nir_idx": 3
      }
    }
  ]
}

Tradeoff. Loading every pipeline at startup grows memory linearly in the number of registered pipelines — fine for a few dozen, painful at hundreds. Above that scale, lazy-load on first request and cache. Hot reload (swap a YAML without restart) sounds attractive but interacts badly with in-flight requests holding references to the old graph; safer to require a process restart and let the load balancer drain.

6. User-uploaded pipelines for viz

The setting. The democratisation case. A scientist or analyst — not necessarily a Python engineer — wants to compose a pipeline from a UI, see the result, and share the recipe. Letting a user submit YAML as a request is a natural mapping, but it bites the moment you call hydra.utils.instantiate() on untrusted input: a malicious payload like _target_: os.system, args: ["rm -rf /"] is arbitrary code execution.

The fix is a sandboxed loader that walks the config tree and rejects any _target_ not in a pre-vetted public registry.

Driver: Streamlit / web UI textbox → POST → server-side sandboxed loader → pipeline → render.

# A tiny registry — the trustworthy public surface
ALLOWED = {
    "Sequential": Sequential,
    "_FakeMaskClouds": _FakeMaskClouds,
    "_FakeNDVI": _FakeNDVI,
    "_FakeDarkObject": _FakeDarkObject,
    "Identity": Identity,
}


class UntrustedTargetError(Exception):
    def __init__(self, target):
        self.target = target
        super().__init__(f"Operator '{target}' is not in the public registry")


def load_dict_sandboxed(cfg: dict):
    """Sandboxed builder. Refuses any class not in ALLOWED."""
    if not isinstance(cfg, dict):
        return cfg
    target = cfg.get("_target_")
    if target is None:
        return cfg
    if target not in ALLOWED:
        raise UntrustedTargetError(target)
    cls = ALLOWED[target]
    # Recursively build child configs
    kwargs = {k: load_dict_sandboxed(v) for k, v in cfg.items() if k != "_target_"}
    # 'operators' for Sequential is a list of child configs
    if "operators" in kwargs:
        kwargs["operators"] = [load_dict_sandboxed(c) for c in kwargs["operators"]]
    return cls(**kwargs)


# A safe submission
safe_yaml = {
    "_target_": "Sequential",
    "operators": [
        {"_target_": "_FakeMaskClouds", "qa_band": -1, "bits": [10, 11]},
        {"_target_": "_FakeNDVI", "red_idx": 2, "nir_idx": 3},
    ],
}

pipe = load_dict_sandboxed(safe_yaml)
print("safe submission ran:", pipe(np.array([10, -5, 50])))


# A malicious submission
evil_yaml = {
    "_target_": "Sequential",
    "operators": [
        {"_target_": "os.system", "args": ["rm -rf /"]},
    ],
}

try:
    load_dict_sandboxed(evil_yaml)
except UntrustedTargetError as e:
    print(f"\nrejected: {e}")
safe submission ran: [1. 0. 5.]

rejected: Operator 'os.system' is not in the public registry

Tradeoff. The sandbox doesn’t solve resource exhaustion — a malicious user can still submit a config that allocates huge intermediates or schedules a million Sequential steps. Time and memory limits per request are mandatory. The other side of the same coin: this pattern also gates internal contributors — a colleague’s experimental Operator can’t run in the demo until it lands in the registry, which is a feature for governance and a friction point for researchers. The right balance is a fast registry-promotion process, not a permissive sandbox.

7. Tile server (z/x/y → PNG)

The setting. Slippy-map UX — a user pans a Leaflet/Mapbox view, the browser issues XYZ tile requests, the server renders each tile on demand. Distinct from a generic backend API: every request is (z, x, y, layer), the response is a 256×256 PNG, p99 must be sub-second, a single map view fans out to dozens of concurrent requests.

Driver: browser map → XYZ tile request → layer registry → operator graph runs once per tile (often cached upstream by a CDN).

Operators in this hot loop run ~10⁴ times per minute, so per-call construction work is the enemy. Load LUTs / state once in __init__, never per call.

class _FakeColormap(Operator):
    """Stand-in for ``gz.viz.Colormap``. Builds its LUT once at construction."""

    def __init__(
        self, *, name: str = "viridis", vmin: float = 0, vmax: float = 1
    ) -> None:
        self.name, self.vmin, self.vmax = name, vmin, vmax
        # one-time work — reused across every call (the whole point)
        rng = np.random.default_rng({"viridis": 0, "RdYlGn": 1}.get(name, 2))
        self.lut = rng.integers(0, 255, size=(256, 4), dtype=np.uint8)

    def _apply(self, arr):
        normed = np.clip((arr - self.vmin) / (self.vmax - self.vmin), 0, 1)
        return self.lut[(normed * 255).astype(np.uint8)]

    def get_config(self) -> dict:
        return {"name": self.name, "vmin": self.vmin, "vmax": self.vmax}


# Layer registry
LAYERS = {
    "ndvi": Sequential(
        [
            _FakeMaskClouds(),
            _FakeNDVI(),
            _FakeColormap(name="RdYlGn", vmin=-1, vmax=1),
        ]
    ),
    "true_color": Sequential(
        [
            _FakeMaskClouds(),
            Lambda(lambda x: np.clip(x / 100.0, 0, 1), name="scale_for_rgb"),
        ]
    ),
}


def tile_handler(layer: str, z: int, x: int, y: int):
    """@app.get('/tiles/{layer}/{z}/{x}/{y}.png') equivalent."""
    # In real code:
    #   gt = catalog.read_tile(mercantile.xy_bounds(x, y, z), size=(256, 256))
    fake_tile = np.linspace(-1, 1, 16).reshape(4, 4) * 10
    rgb = LAYERS[layer](fake_tile)
    # In real code: return Response(rgb.to_png_bytes(), media_type="image/png")
    return f"<png {rgb.shape} for layer={layer} z={z} x={x} y={y}>"


# Simulate a few tile requests
for z, x, y in [(10, 512, 256), (10, 513, 256), (10, 512, 257)]:
    print(tile_handler("ndvi", z, x, y))
<png (4, 4, 4) for layer=ndvi z=10 x=512 y=256>
<png (4, 4, 4) for layer=ndvi z=10 x=513 y=256>
<png (4, 4, 4) for layer=ndvi z=10 x=512 y=257>

Tradeoff. A tile cache (Redis, CloudFront, on-disk) usually matters more than Python-level performance — the same tile gets requested by every user panning the same area. With caching, the operator graph runs once per distinct tile and the API mostly serves bytes. Without caching, every operator’s hot-path performance is exposed. Stateful operators (e.g. learned percentile clip) need careful handling: the state must be loaded once at startup, not per request.

8. Data validation / QC as operators

The setting. Data goes wrong in predictable ways — wrong CRS, wrong resolution, missing bands, all-NaN scenes from a failed download. Catching these at the right pipeline stage is the difference between “model trained on garbage for a week” and “pipeline halted on the first bad scene.” The library contribution is making assertions look like operators: pass-through operators that raise (or warn) on contract violations, droppable anywhere in the chain.

Driver: the same pipeline that does the work runs the assertions — no separate validation framework, no out-of-band tests.

class QCError(Exception):
    pass


class AssertValueRange(Operator):
    def __init__(
        self, *, min_val: float, max_val: float, on_fail: str = "raise"
    ) -> None:
        if on_fail not in ("raise", "warn"):
            raise ValueError("on_fail must be 'raise' or 'warn'")
        self.min, self.max, self.on_fail = min_val, max_val, on_fail

    def _apply(self, x):
        arr = np.asarray(x)
        lo, hi = float(np.nanmin(arr)), float(np.nanmax(arr))
        if lo < self.min or hi > self.max:
            msg = f"range [{lo:.3g}, {hi:.3g}] outside [{self.min}, {self.max}]"
            if self.on_fail == "raise":
                raise QCError(msg)
            print(f"  WARNING: {msg}")
        return x

    def get_config(self) -> dict:
        return {"min_val": self.min, "max_val": self.max, "on_fail": self.on_fail}


class AssertValidFraction(Operator):
    def __init__(self, *, min_valid: float, on_fail: str = "raise") -> None:
        self.min_valid, self.on_fail = min_valid, on_fail

    def _apply(self, x):
        arr = np.asarray(x, dtype=float)
        valid = float(np.mean(~np.isnan(arr)))
        if valid < self.min_valid:
            msg = f"valid fraction {valid:.1%} below {self.min_valid:.0%}"
            if self.on_fail == "raise":
                raise QCError(msg)
            print(f"  WARNING: {msg}")
        return x

    def get_config(self) -> dict:
        return {"min_valid": self.min_valid, "on_fail": self.on_fail}


# Stack of assertions reused at multiple checkpoints
qc = Sequential(
    [
        AssertValueRange(min_val=0, max_val=10_000, on_fail="warn"),
        AssertValidFraction(min_valid=0.5, on_fail="warn"),
    ]
)

# Drop into ETL like a tracer round
etl_with_qc = Sequential(
    [
        _FakeMaskClouds(),
        qc,  # first checkpoint
        _FakeToBOA(),
        qc,  # again, post-correction
        _FakeNDVI(),
    ]
)

# Good input — silent
print("good input (silent):")
out = etl_with_qc(np.array([10.0, 50.0, 200.0, 1000.0]))
print("  result:", out)

# Bad input — warnings fire
print("\nbad input (warnings fire):")
out = etl_with_qc(np.array([10.0, 50.0, 200.0, 99_999.0]))
print("  result:", out)

# Strict mode
print("\nstrict mode raises:")
strict_qc = AssertValueRange(min_val=0, max_val=100)
try:
    strict_qc(np.array([5, 25, 9999]))
except QCError as e:
    print(f"  caught: {e}")
good input (silent):
  result: [  1.02   5.1   20.4  102.  ]

bad input (warnings fire):
  WARNING: range [10, 1e+05] outside [0, 10000]
  WARNING: range [10.2, 1.02e+05] outside [0, 10000]
  result: [1.0200000e+00 5.1000000e+00 2.0400000e+01 1.0199898e+04]

strict mode raises:
  caught: range [5, 1e+04] outside [0, 100]

Tradeoff. Where a QC step fires (raise vs warn) is a per-deployment policy. A research notebook wants on_fail="warn" so exploration isn’t interrupted; a production ETL wants on_fail="raise" plus alerting. The same YAML can ship with on_fail="${qc_policy}" and let the loader inject the right value. The performance cost is real — full-array reductions (nanmin, nanmax) on large carriers aren’t free. For tile-server hot paths, sample a small fraction of pixels rather than the full array.

9. Pinned / hashed regulatory artifact

The setting. A scientific paper, a regulatory submission, or an attribution claim from operational monitoring needs to be byte-exact reproducible years later. “Re-run the pipeline” requires three things to be pinned together: the operator graph, the inputs (with content hashes, not just URIs), and the dependency stack. Treat the operator graph as the deliverable, not just a way to compute one.

Driver: publication time → Artifact.freeze(pipeline, inputs, deps_lock) → tarball goes into archive. Years later → Artifact.load(path).rerun().

class Artifact:
    """Reproducibility envelope around an Operator graph.

    In real life this is a tarball of (pipeline_yaml, hashes, deps_lock,
    metadata.json). Here we keep it in-memory.
    """

    def __init__(
        self, *, pipeline_cfg, pipeline_hash, inputs, deps_lock, metadata
    ) -> None:
        self.pipeline_cfg = pipeline_cfg
        self.pipeline_hash = pipeline_hash
        self.inputs = inputs
        self.deps_lock = deps_lock
        self.metadata = metadata

    @classmethod
    def freeze(
        cls, pipeline: Operator, *, inputs: dict, deps_lock: str, metadata: dict
    ) -> "Artifact":
        cfg = pipeline.get_config()
        canon = _json.dumps(cfg, sort_keys=True)
        return cls(
            pipeline_cfg=cfg,
            pipeline_hash=hashlib.sha256(canon.encode()).hexdigest(),
            inputs=inputs,
            deps_lock=deps_lock,
            metadata=metadata,
        )


# === At publication time ===
pipeline = Sequential(
    [
        _FakeMaskClouds(qa_band=-1, bits=[10, 11]),
        _FakeDarkObject(percentile=1),
        _FakeNDVI(red_idx=2, nir_idx=3),
    ]
)

scene_payload = np.array([10.0, -5.0, 50.0, 100.0])

artifact = Artifact.freeze(
    pipeline,
    inputs={
        "scene_uri": "s3://imeo/emit/2024Q3/scene_001.nc",
        "scene_sha256": hashlib.sha256(scene_payload.tobytes()).hexdigest(),
    },
    deps_lock="geotoolz==0.0.1\nnumpy==1.26.0\n# ... full lockfile here ...",
    metadata={"author": "ej", "date": "2024-09-15", "doi": "10.xxxx/mars-2024Q3"},
)

print("pipeline hash:", artifact.pipeline_hash[:16] + "...")
print("input hash   :", artifact.inputs["scene_sha256"][:16] + "...")
print("metadata     :", artifact.metadata)

# === Years later: re-instantiate and re-run ===
# In real code, gz.repro.load() rebuilds the pipeline from artifact.pipeline_cfg
# via the same sandboxed loader as case 6.
print()
print("Re-running yields identical output:")
print("  run 1:", pipeline(scene_payload))
print("  run 2:", pipeline(scene_payload))
pipeline hash: 990c9d4a5c66301a...
input hash   : e3cf3258a1a91089...
metadata     : {'author': 'ej', 'date': '2024-09-15', 'doi': '10.xxxx/mars-2024Q3'}

Re-running yields identical output:
  run 1: [ 0.97 -0.03  4.97  9.97]
  run 2: [ 0.97 -0.03  4.97  9.97]

Tradeoff. A poetry.lock alone is not enough for true byte-exactness — BLAS implementations, GPU drivers, libc versions, even the order in which floating-point reductions are evaluated all affect numerical output. Honest framing: the artifact guarantees the operator graph and inputs; it aspires to numerical reproducibility, with the deps_lock as a best-effort. For genuine bit-exactness, ship a Docker image hash alongside the artifact. External URIs decay over years; for true long-term archive, the artifact should optionally include the bytes of small inputs, not just their hashes.

10. Active learning / uncertainty-driven sampling

The setting. Labels are expensive (an expert annotator costs more than a GPU-hour). The classic active-learning loop: train a model, score the unlabelled pool by uncertainty, send the highest-uncertainty examples for labelling, retrain. The library contribution: a sampler that consumes operator output to decide what to sample next.

Driver: training-loop orchestrator → ActiveSamplerbase_sampler ∘ score_op → top-K. Same composition primitives, but the iterator is now informed by inference.

class _FakeUQModel:
    """Stand-in for a model that returns (prediction, uncertainty)."""

    def predict_with_uncertainty(self, arr):
        # Dummy: prediction = mean, sigma = std
        return {"pred": float(np.mean(arr)), "sigma": float(np.std(arr))}


class _ExtractSigma(Operator):
    """GeoTensor-like → scalar (just the uncertainty field)."""

    def _apply(self, output_dict):
        return output_dict["sigma"]

    def get_config(self) -> dict:
        return {}


# A scoring pipeline that ends in a scalar (the uncertainty estimate)
score_pipeline = Sequential(
    [
        _FakeMaskClouds(),
        ModelOp(_FakeUQModel(), method="predict_with_uncertainty"),
        _ExtractSigma(),
    ]
)


class ActiveSampler:
    """Stand-in for ``gz.sampling.ActiveSampler``."""

    def __init__(
        self, *, base: FakeCatalog, score_op: Operator, k: int = 5, mode: str = "top"
    ) -> None:
        self.base, self.score_op, self.k, self.mode = base, score_op, k, mode

    def __call__(self) -> list:
        scored = []
        for row in self.base.iter_rows():
            score = float(self.score_op(row.payload))
            scored.append((row, score))
        scored.sort(key=lambda s: s[1], reverse=(self.mode == "top"))
        return scored[: self.k]


catalog = make_demo_catalog(n=12)
sampler = ActiveSampler(base=catalog, score_op=score_pipeline, k=3, mode="top")
top_uncertain = sampler()

print("Top 3 most uncertain scenes (highest sigma):")
for row, sigma in top_uncertain:
    print(f"  {row.id}  sigma={sigma:.3f}  payload={row.payload.tolist()}")
Top 3 most uncertain scenes (highest sigma):
  scene_008  sigma=35.089  payload=[7, 86, 0, 53]
  scene_005  sigma=32.058  payload=[26, 81, 66, -2]
  scene_006  sigma=30.270  payload=[38, 85, 54, 1]

Tradeoff. The dirty secret of active learning: scoring is itself expensive — you’re running model inference over the entire unlabelled pool just to pick the next batch. For large catalogs this dominates the labelling budget you were trying to save. Mitigations: random sub-sampling before scoring (score 10% of the pool, label the top 1%), or tiered scoring (cheap heuristic filters first, expensive model second). Pure top-k by uncertainty also under-explores — pair with a mode="diverse" option that uses a clustering step on chip embeddings before picking, to avoid a top-100 that’s all one land-cover type.

11. Cross-sensor fusion (the Graph case)

The setting. Combining modalities — radar with optical, optical with DEM, optical with weather reanalysis. Each input has different spatial extent, resolution, CRS, and acquisition time, and the fusion rule isn’t a single linear chain. This is where Sequential runs out and Graph earns its keep.

Driver: the applied scientist describes the DAG declaratively (opt → mask → align ← sar) instead of as a tangle of intermediate variables. The same Graph can serialise to YAML and run in any deployment shape from the preceding cases.

class _FakeLeeSpeckle(Operator):
    def __init__(self, *, window: int = 7) -> None:
        self.window = window

    def _apply(self, x):
        # Dummy: 'smooth' by mean
        arr = np.asarray(x, dtype=float)
        return arr - arr.mean() + 5.0  # standin

    def get_config(self) -> dict:
        return {"window": self.window}


class _FakeAlignAndStack(Operator):
    """Stand-in for the v0.2+ fusion op. Stacks aligned inputs along a new axis."""

    def __init__(self, *, target_grid: str = "optical") -> None:
        self.target_grid = target_grid

    def _apply(self, *inputs):
        # Dummy: just stack as 1-D arrays
        return np.stack([np.asarray(x).flatten()[:4] for x in inputs])

    def get_config(self) -> dict:
        return {"target_grid": self.target_grid}


class _FakeCropClassifier:
    def __call__(self, stack):
        return {
            "class_id": int(np.argmax(stack.sum(axis=0))),
            "scores": stack.sum(axis=0).tolist(),
        }


# Build the graph
opt = Input("optical")
sar = Input("sar")
dem = Input("dem")

opt_clean = _FakeMaskClouds(qa_band=-1, bits=[10, 11])(opt)
sar_smooth = _FakeLeeSpeckle(window=7)(sar)
fused = _FakeAlignAndStack(target_grid="optical")(opt_clean, sar_smooth, dem)
classify = ModelOp(_FakeCropClassifier())(fused)

g = Graph(
    inputs={"optical": opt, "sar": sar, "dem": dem},
    outputs={
        "crop_map": classify,
        "fused_stack": fused,  # multiple outputs are free
    },
)

print("graph:", g)
print()

# Run with dummy inputs
result = g(
    optical=np.array([10, 20, -5, 100]),
    sar=np.array([-3, -7, -2, -8]),
    dem=np.array([100, 110, 120, 105]),
)
print("crop_map   :", result["crop_map"])
print("fused_stack:\n", result["fused_stack"])
graph: Graph(inputs=[optical, sar, dem], outputs=[crop_map, fused_stack])

crop_map   : {'class_id': 3, 'scores': [117.0, 133.0, 128.0, 207.0]}
fused_stack:
 [[ 10.  20.   0. 100.]
 [  7.   3.   8.   2.]
 [100. 110. 120. 105.]]

Tradeoff. AlignAndStack is the load-bearing operator and the place fusion pipelines silently fail — picking which input defines the target grid (target_grid="optical") discards information from the other inputs (SAR gets resampled to the optical grid, losing the SAR’s native resolution). For some workflows that’s exactly right; for others (super-resolution from SAR to optical, for example) the choice is wrong. Make the choice explicit and test it. Multi-input arity also complicates Operator validation — a Graph with a missing input fails at runtime, not construction time, unless you add explicit input-presence checks.

12. Workflow orchestrator task units

The setting. Production scheduling — Airflow, Prefect, Dagster. The orchestrator owns retry, parallelism, monitoring, alerting, and DAG topology; the operator graph owns the science. The interface between them is small: pickleable task units that the orchestrator can submit to workers.

Driver: Prefect / Airflow / Dagster flow definition. Each task wraps (op_yaml_path, slice, source_uri) and reconstructs the operator graph on the worker side.

The example below shows a Prefect-style task wrapper as a plain function (no decorators) so it runs in this environment. In real code you’d add @task(retries=3, retry_delay_seconds=60).

# In real Prefect code:
#   @task(retries=3, retry_delay_seconds=60)
def run_op(op_factory, slice_dict: dict, source_uri: str) -> dict:
    """Task: reconstruct the op on the worker, run it on one slice."""
    # In real code: op = gz.serialization.load_yaml(op_yaml_path)
    op = op_factory()
    # In real code: gt = georeader.RasterioReader.open(source_uri).read_geoslice(...)
    gt = np.asarray(slice_dict["payload"])
    out = op(gt)
    out_uri = fake_save_cog(out, f"s3://daily/{slice_dict['id']}.tif")
    return {"out_uri": out_uri, "stats_mean": float(np.mean(out))}


# In real Prefect code:
#   @flow
def daily_methane_flow(date: str) -> list:
    """Flow: one task per slice in the catalog."""
    catalog = make_demo_catalog(n=5)

    # The op factory is pickleable — closures over module-level
    # classes are fine. Avoid closures over local mutable state.
    def build_op():
        return Sequential(
            [
                _FakeMaskClouds(qa_band=-1, bits=[10, 11]),
                _FakeDarkObject(percentile=1),
                _FakeNDVI(red_idx=2, nir_idx=3),
            ]
        )

    results = []
    for row in catalog.iter_rows():
        # In real Prefect:
        #   futures.append(run_op.submit(build_op, slice_dict, source_uri))
        slice_dict = {"id": row.id, "payload": row.payload.tolist()}
        results.append(run_op(build_op, slice_dict, f"s3://bucket/{row.id}.tif"))

    return results


print("Flow output:")
for r in daily_methane_flow("2024-09-15"):
    print(f"  {r}")
Flow output:
  {'out_uri': 's3://daily/scene_000.tif', 'stats_mean': 2.95}
  {'out_uri': 's3://daily/scene_001.tif', 'stats_mean': 0.8940000000000001}
  {'out_uri': 's3://daily/scene_002.tif', 'stats_mean': 4.603}
  {'out_uri': 's3://daily/scene_003.tif', 'stats_mean': 1.9950000000000003}
  {'out_uri': 's3://daily/scene_004.tif', 'stats_mean': 1.2689999999999997}

Tradeoff. Pickling crosses worker boundaries, so the same constraints as case 2 apply — no closures, no captured RNGs, no unbounded factories. Retries need to be idempotent: if run_op is retried, the output COG path is the same and is overwritten, not duplicated. State that lives outside the operator graph (database writes, message queues) is the orchestrator’s problem — don’t bake side effects into operators. Alerting on failure rate (rather than first failure) is appropriate at the orchestrator layer; an operator-level on_fail="raise" would defeat that.

13. Pipeline diffing / A-B regression

The setting. You’re about to roll out a new version of a production pipeline — a tweaked atmospheric correction, a new cloud-mask threshold, a calibration update. The release-management question: did anything regress that I didn’t intend? Because operator graphs serialise, you can both diff them (config-level) and run them side-by-side on the same input (numerical-level).

Driver: release engineer or CI job runs diff(old, new) (textual config diff) and AB(old, new).run(catalog) (numerical comparison).

import difflib


def config_diff(op_a: Operator, op_b: Operator) -> str:
    """Human-readable diff of two operator graphs' configs."""
    a_lines = _json.dumps(op_a.get_config(), indent=2, sort_keys=True).splitlines()
    b_lines = _json.dumps(op_b.get_config(), indent=2, sort_keys=True).splitlines()
    diff = difflib.unified_diff(a_lines, b_lines, lineterm="", n=2)
    return "\n".join(diff)


class _MeanAbsDiff:
    def __call__(self, a, b):
        return float(np.mean(np.abs(np.asarray(a) - np.asarray(b))))


class AB:
    """Run two pipelines on the same inputs; report per-slice metric."""

    def __init__(self, op_a: Operator, op_b: Operator, *, metric) -> None:
        self.a, self.b, self.metric = op_a, op_b, metric

    def run(self, catalog) -> dict:
        rows = []
        for row in catalog.iter_rows():
            gt = row.payload
            ya, yb = self.a(gt), self.b(gt)
            rows.append((row.id, float(self.metric(ya, yb))))
        rows.sort(key=lambda r: -r[1])
        return {
            "n": len(rows),
            "mean_metric": float(np.mean([r[1] for r in rows])),
            "max_metric": rows[0][1],
            "max_slice": rows[0][0],
            "per_slice": rows,
        }


old = Sequential(
    [
        _FakeMaskClouds(),
        _FakeDarkObject(percentile=1),  # old: percentile=1
        _FakeNDVI(),
    ]
)

new = Sequential(
    [
        _FakeMaskClouds(),
        _FakeDarkObject(percentile=2),  # new: percentile=2
        _FakeLeeSpeckle(window=5),  # new: added pre-step
        _FakeNDVI(),
    ]
)


print("=== Config diff (textual) ===")
print(config_diff(old, new))
print()

print("=== Numerical A/B over the catalog ===")
catalog = make_demo_catalog(n=5)
report = AB(old, new, metric=_MeanAbsDiff()).run(catalog)
print(f"n          : {report['n']}")
print(f"mean diff  : {report['mean_metric']:.4g}")
print(f"max diff   : {report['max_metric']:.4g} ({report['max_slice']})")
print()
print("per-slice (sorted by diff):")
for sid, d in report["per_slice"]:
    print(f"  {sid}  diff={d:.4g}")
=== Config diff (textual) ===
--- 
+++ 
@@ -14,5 +14,11 @@
       "class": "_FakeDarkObject",
       "config": {
-        "percentile": 1
+        "percentile": 2
+      }
+    },
+    {
+      "class": "_FakeLeeSpeckle",
+      "config": {
+        "window": 5
       }
     },

=== Numerical A/B over the catalog ===
n          : 5
mean diff  : 1.842
max diff   : 4.103 (scene_002)

per-slice (sorted by diff):
  scene_002  diff=4.103
  scene_000  diff=2.45
  scene_003  diff=1.495
  scene_004  diff=0.769
  scene_001  diff=0.394

Tradeoff. Mean RMSE alone can hide bimodal regressions — most slices fine, a few catastrophically worse. Always look at per-tile distributions and the worst-N tiles, not just aggregate stats. Choosing the metric is itself a design decision: RMSE is wrong for probability outputs (use cross-entropy or KL), wrong for hard classifications (use confusion-matrix shifts). The config_diff is human-readable but doesn’t capture semantic difference — a parameter rename that’s mathematically equivalent shows up as a diff anyway. Pair config-level diffing with numerical AB testing, and trust the numerics.

The pattern

The operator graph is useful wherever you’d otherwise hand-roll glue between input → transform → output. What changes between cases is who drives: a notebook user, an HTTP request, a scheduler, a sampler, a tile renderer, or another graph. The library’s job is to keep the graph itself a stable artifact across all of them — written once, audited once, run anywhere.

Where next