Skip to content

API Reference — Core

The composition algebra. Importable as geotoolz.core.* and re-exported at geotoolz.* for convenience.

For the model behind these primitives, read the Concepts page first.

Base

geotoolz.core._src.operator.Operator

Base class for geotoolz operators.

Subclasses implement _apply(self, *args, **kwargs). The base class dispatches __call__ to either _apply (eager mode) or Node construction (graph-building mode) based on whether any argument is a Node / Input.

Attributes:

Name Type Description
forbid_in_yaml bool

True on subclasses that hold non-serialisable user state (callables, closures, runtime references). Future YAML loaders should refuse to instantiate flagged operators and YAML dumpers should refuse to serialise graphs containing them. Default False — most operators round-trip.

_terminal bool

True on subclasses that legitimately return None (or otherwise break the GeoTensor → GeoTensor contract). Sequential rejects terminal operators in any position except the last. Default False.

Examples:

Implement a tiny operator::

class Scale(Operator):
    def __init__(self, factor: float) -> None:
        self.factor = factor

    def _apply(self, gt):
        return gt * self.factor

    def get_config(self) -> dict:
        return {"factor": self.factor}

Compose with |::

pipeline = Scale(0.5) | Scale(2.0)   # Sequential([Scale(0.5), Scale(2.0)])
result = pipeline(gt)
Source code in src/geotoolz/core/_src/operator.py
class Operator:
    """Base class for geotoolz operators.

    Subclasses implement ``_apply(self, *args, **kwargs)``. The base
    class dispatches ``__call__`` to either ``_apply`` (eager mode) or
    `Node` construction (graph-building mode) based on whether any
    argument is a `Node` / `Input`.

    Attributes:
        forbid_in_yaml: ``True`` on subclasses that hold non-serialisable
            user state (callables, closures, runtime references). Future
            YAML loaders should refuse to instantiate flagged operators
            and YAML dumpers should refuse to serialise graphs containing
            them. Default ``False`` — most operators round-trip.
        _terminal: ``True`` on subclasses that legitimately return ``None``
            (or otherwise break the ``GeoTensor → GeoTensor`` contract).
            ``Sequential`` rejects terminal operators in any position
            except the last. Default ``False``.

    Examples:
        Implement a tiny operator::

            class Scale(Operator):
                def __init__(self, factor: float) -> None:
                    self.factor = factor

                def _apply(self, gt):
                    return gt * self.factor

                def get_config(self) -> dict:
                    return {"factor": self.factor}

        Compose with ``|``::

            pipeline = Scale(0.5) | Scale(2.0)   # Sequential([Scale(0.5), Scale(2.0)])
            result = pipeline(gt)
    """

    forbid_in_yaml: ClassVar[bool] = False
    _terminal: ClassVar[bool] = False

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """Dispatch on argument type.

        If any positional argument is a graph node, returns a new ``Node``
        (graph-building mode). Otherwise calls ``_apply`` (eager mode).
        Subclasses should override ``_apply``, not ``__call__``.
        """
        # Lazy import to avoid circular dependency with graph.py
        from geotoolz.core._src.graph import Input, Node

        if any(isinstance(a, (Node, Input)) for a in args):
            return Node(operator=self, parents=tuple(args))
        out = self._apply(*args, **kwargs)
        # Reserved post-apply hook dispatch (Spy/Hook family lands in v0.2).
        # Keeping the call site here means adding hooks later won't require
        # editing every Operator subclass.
        self._dispatch_post_apply_hooks(args, out)
        return out

    def _apply(self, *args: Any, **kwargs: Any) -> Any:
        """Implement the operator's behaviour. Override in subclasses."""
        raise NotImplementedError(f"{type(self).__name__} must implement _apply().")

    def _dispatch_post_apply_hooks(self, args: tuple[Any, ...], out: Any) -> None:
        """No-op in v0.1. Reserved for Spy / observer hook dispatch in v0.2."""
        return None

    def get_config(self) -> dict[str, Any]:
        """Return a JSON-serialisable dict of constructor args.

        Override in subclasses to enable ``__repr__``, pickling assertions,
        and Hydra-zen ``builds()`` integration. Operators that hold user
        closures should still return what config they can but additionally
        set ``forbid_in_yaml = True``.
        """
        return {}

    @property
    def state(self) -> dict[str, Any]:
        """Return a JSON-serialisable state record for this operator."""
        return {
            "module": type(self).__module__,
            "class": type(self).__name__,
            "config": self.get_config(),
        }

    @classmethod
    def from_state(cls, state: dict[str, Any]) -> Operator:
        """Reconstruct an operator from a ``state`` record.

        Walks ``cls`` plus all transitive subclasses so calling
        ``Subclass.from_state(state)`` resolves ``Subclass`` itself. Only
        operators whose ``config`` is composed of JSON primitives are
        reconstructible — operators that nest other operators in their
        config (e.g. ``AppendIndex``) emit debug/provenance payloads and
        raise a clear ``RuntimeError`` here instead of letting the
        constructor blow up with ``TypeError``.
        """
        module_name = state.get("module")
        class_name = state.get("class")
        config = state.get("config", {})
        if not isinstance(module_name, str) or not module_name.startswith("geotoolz."):
            raise ValueError("Operator state must include a geotoolz module")
        if not isinstance(class_name, str):
            raise ValueError("Operator state must include a class name")
        if not isinstance(config, dict):
            raise ValueError("Operator state config must be a dictionary")

        for op_type in (cls, *_operator_subclasses(cls)):
            if op_type.__module__ == module_name and op_type.__name__ == class_name:
                non_primitive_keys = [
                    k for k, v in config.items() if not _is_json_primitive(v)
                ]
                if non_primitive_keys:
                    raise RuntimeError(
                        f"from_state cannot reconstruct {op_type.__name__}: "
                        f"config contains non-primitive values "
                        f"{non_primitive_keys}. Use the regular constructor."
                    )
                return op_type(**config)
        raise TypeError(f"{module_name}.{class_name} is not a loaded Operator")

    def __repr__(self) -> str:
        params = ", ".join(f"{k}={v!r}" for k, v in self.get_config().items())
        return f"{type(self).__name__}({params})"

    def __or__(self, other: Operator) -> Sequential:
        """``op1 | op2`` returns ``Sequential([op1, op2])``.

        Flattens nested ``Sequential`` instances on the right-hand side so
        ``a | (b | c)`` and ``(a | b) | c`` both produce a single
        three-element ``Sequential``.
        """
        # Lazy import: Sequential subclasses Operator.
        from geotoolz.core._src.sequential import Sequential

        if isinstance(other, Sequential):
            return Sequential([self, *other.operators])
        return Sequential([self, other])

state: dict[str, Any] property

Return a JSON-serialisable state record for this operator.

__call__(*args: Any, **kwargs: Any) -> Any

Dispatch on argument type.

If any positional argument is a graph node, returns a new Node (graph-building mode). Otherwise calls _apply (eager mode). Subclasses should override _apply, not __call__.

Source code in src/geotoolz/core/_src/operator.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """Dispatch on argument type.

    If any positional argument is a graph node, returns a new ``Node``
    (graph-building mode). Otherwise calls ``_apply`` (eager mode).
    Subclasses should override ``_apply``, not ``__call__``.
    """
    # Lazy import to avoid circular dependency with graph.py
    from geotoolz.core._src.graph import Input, Node

    if any(isinstance(a, (Node, Input)) for a in args):
        return Node(operator=self, parents=tuple(args))
    out = self._apply(*args, **kwargs)
    # Reserved post-apply hook dispatch (Spy/Hook family lands in v0.2).
    # Keeping the call site here means adding hooks later won't require
    # editing every Operator subclass.
    self._dispatch_post_apply_hooks(args, out)
    return out

__or__(other: Operator) -> Sequential

op1 | op2 returns Sequential([op1, op2]).

Flattens nested Sequential instances on the right-hand side so a | (b | c) and (a | b) | c both produce a single three-element Sequential.

Source code in src/geotoolz/core/_src/operator.py
def __or__(self, other: Operator) -> Sequential:
    """``op1 | op2`` returns ``Sequential([op1, op2])``.

    Flattens nested ``Sequential`` instances on the right-hand side so
    ``a | (b | c)`` and ``(a | b) | c`` both produce a single
    three-element ``Sequential``.
    """
    # Lazy import: Sequential subclasses Operator.
    from geotoolz.core._src.sequential import Sequential

    if isinstance(other, Sequential):
        return Sequential([self, *other.operators])
    return Sequential([self, other])

from_state(state: dict[str, Any]) -> Operator classmethod

Reconstruct an operator from a state record.

Walks cls plus all transitive subclasses so calling Subclass.from_state(state) resolves Subclass itself. Only operators whose config is composed of JSON primitives are reconstructible — operators that nest other operators in their config (e.g. AppendIndex) emit debug/provenance payloads and raise a clear RuntimeError here instead of letting the constructor blow up with TypeError.

Source code in src/geotoolz/core/_src/operator.py
@classmethod
def from_state(cls, state: dict[str, Any]) -> Operator:
    """Reconstruct an operator from a ``state`` record.

    Walks ``cls`` plus all transitive subclasses so calling
    ``Subclass.from_state(state)`` resolves ``Subclass`` itself. Only
    operators whose ``config`` is composed of JSON primitives are
    reconstructible — operators that nest other operators in their
    config (e.g. ``AppendIndex``) emit debug/provenance payloads and
    raise a clear ``RuntimeError`` here instead of letting the
    constructor blow up with ``TypeError``.
    """
    module_name = state.get("module")
    class_name = state.get("class")
    config = state.get("config", {})
    if not isinstance(module_name, str) or not module_name.startswith("geotoolz."):
        raise ValueError("Operator state must include a geotoolz module")
    if not isinstance(class_name, str):
        raise ValueError("Operator state must include a class name")
    if not isinstance(config, dict):
        raise ValueError("Operator state config must be a dictionary")

    for op_type in (cls, *_operator_subclasses(cls)):
        if op_type.__module__ == module_name and op_type.__name__ == class_name:
            non_primitive_keys = [
                k for k, v in config.items() if not _is_json_primitive(v)
            ]
            if non_primitive_keys:
                raise RuntimeError(
                    f"from_state cannot reconstruct {op_type.__name__}: "
                    f"config contains non-primitive values "
                    f"{non_primitive_keys}. Use the regular constructor."
                )
            return op_type(**config)
    raise TypeError(f"{module_name}.{class_name} is not a loaded Operator")

get_config() -> dict[str, Any]

Return a JSON-serialisable dict of constructor args.

Override in subclasses to enable __repr__, pickling assertions, and Hydra-zen builds() integration. Operators that hold user closures should still return what config they can but additionally set forbid_in_yaml = True.

Source code in src/geotoolz/core/_src/operator.py
def get_config(self) -> dict[str, Any]:
    """Return a JSON-serialisable dict of constructor args.

    Override in subclasses to enable ``__repr__``, pickling assertions,
    and Hydra-zen ``builds()`` integration. Operators that hold user
    closures should still return what config they can but additionally
    set ``forbid_in_yaml = True``.
    """
    return {}

geotoolz.core._src.operator.Carrier = Any module-attribute

Linear composition

geotoolz.core._src.sequential.Sequential

Bases: Operator

Apply a list of operators in order, threading the output of each into the next.

Parameters:

Name Type Description Default
operators list[Operator]

A list of Operator instances. Empty list is allowed; calling an empty Sequential is the identity.

required

Raises:

Type Description
TypeError

if any element of operators is not an Operator, or if any element except the last is marked _terminal.

Examples:

Basic::

pipe = Sequential([Scale(0.5), Scale(2.0)])
assert pipe(gt) == gt  # 0.5 * 2.0

Via the pipe operator::

pipe = Scale(0.5) | Scale(2.0)

Terminal op at the end is fine::

pipe = Sequential([NDVI(...), WriteCOG("/out.tif")])

Terminal op anywhere else raises::

Sequential([WriteCOG("/x"), NDVI(...)])  # TypeError
Source code in src/geotoolz/core/_src/sequential.py
class Sequential(Operator):
    """Apply a list of operators in order, threading the output of each
    into the next.

    Args:
        operators: A list of `Operator` instances. Empty list is allowed;
            calling an empty `Sequential` is the identity.

    Raises:
        TypeError: if any element of ``operators`` is not an `Operator`,
            or if any element except the last is marked ``_terminal``.

    Examples:
        Basic::

            pipe = Sequential([Scale(0.5), Scale(2.0)])
            assert pipe(gt) == gt  # 0.5 * 2.0

        Via the pipe operator::

            pipe = Scale(0.5) | Scale(2.0)

        Terminal op at the end is fine::

            pipe = Sequential([NDVI(...), WriteCOG("/out.tif")])

        Terminal op anywhere else raises::

            Sequential([WriteCOG("/x"), NDVI(...)])  # TypeError
    """

    def __init__(self, operators: list[Operator]) -> None:
        for i, op in enumerate(operators):
            if not isinstance(op, Operator):
                raise TypeError(
                    f"Sequential[{i}] is {type(op).__name__}, expected Operator."
                )
        for i, op in enumerate(operators[:-1]):
            if op._terminal:
                raise TypeError(
                    f"Sequential[{i}] is a terminal operator "
                    f"({type(op).__name__}) — terminal operators are only "
                    "valid as the last step of a Sequential."
                )
        self.operators = list(operators)

    def _apply(self, gt: Carrier = _MISSING) -> Any:
        # Return type stays ``Any`` rather than ``Carrier``: a Sequential
        # may legitimately reduce (e.g. end in a ``Mean`` that returns a
        # scalar) so we can't promise the carrier shape survives.
        if gt is _MISSING and not self.operators:
            raise TypeError(
                "Sequential([]) cannot be called without an input: "
                "empty pipeline has no operations to perform."
            )
        if gt is _MISSING:
            out = self.operators[0]()
            operators = self.operators[1:]
        else:
            out = gt
            operators = self.operators
        for op in operators:
            out = op(out)
        return out

    def get_config(self) -> dict[str, Any]:
        return {
            "operators": [
                {"class": type(op).__name__, "config": op.get_config()}
                for op in self.operators
            ]
        }

    def __or__(self, other: Operator) -> Sequential:
        """Append on the right; flatten nested `Sequential`s.

        ``Sequential([a, b]) | c`` → ``Sequential([a, b, c])``
        ``Sequential([a, b]) | Sequential([c, d])`` → ``Sequential([a, b, c, d])``
        """
        if isinstance(other, Sequential):
            return Sequential([*self.operators, *other.operators])
        if not isinstance(other, Operator):
            return NotImplemented
        return Sequential([*self.operators, other])

    def __repr__(self) -> str:
        if not self.operators:
            return "Sequential([])"
        inner = ", ".join(repr(op) for op in self.operators)
        return f"Sequential([{inner}])"

    def __len__(self) -> int:
        return len(self.operators)

    def __getitem__(self, key: int | slice) -> Operator | Sequential:
        """Index or slice into the underlying operator list.

        ``pipe[0]`` returns the first operator. ``pipe[1:3]`` returns a
        new `Sequential` containing the slice.
        """
        if isinstance(key, slice):
            return Sequential(self.operators[key])
        return self.operators[key]

__getitem__(key: int | slice) -> Operator | Sequential

Index or slice into the underlying operator list.

pipe[0] returns the first operator. pipe[1:3] returns a new Sequential containing the slice.

Source code in src/geotoolz/core/_src/sequential.py
def __getitem__(self, key: int | slice) -> Operator | Sequential:
    """Index or slice into the underlying operator list.

    ``pipe[0]`` returns the first operator. ``pipe[1:3]`` returns a
    new `Sequential` containing the slice.
    """
    if isinstance(key, slice):
        return Sequential(self.operators[key])
    return self.operators[key]

__or__(other: Operator) -> Sequential

Append on the right; flatten nested Sequentials.

Sequential([a, b]) | cSequential([a, b, c]) Sequential([a, b]) | Sequential([c, d])Sequential([a, b, c, d])

Source code in src/geotoolz/core/_src/sequential.py
def __or__(self, other: Operator) -> Sequential:
    """Append on the right; flatten nested `Sequential`s.

    ``Sequential([a, b]) | c`` → ``Sequential([a, b, c])``
    ``Sequential([a, b]) | Sequential([c, d])`` → ``Sequential([a, b, c, d])``
    """
    if isinstance(other, Sequential):
        return Sequential([*self.operators, *other.operators])
    if not isinstance(other, Operator):
        return NotImplemented
    return Sequential([*self.operators, other])

Graphs

geotoolz.core._src.graph.Input dataclass

A named entry point into a Graph.

Input instances are placeholders during graph construction. Operator.__call__ recognises them as graph mode (same as Node). Graph._apply consumes the keyword **inputs mapping by name.

The dataclass disables __eq__ so Input instances compare by identity — necessary for the id(...)-keyed evaluation cache.

Source code in src/geotoolz/core/_src/graph.py
@dataclass(eq=False)
class Input:
    """A named entry point into a `Graph`.

    `Input` instances are placeholders during graph construction.
    `Operator.__call__` recognises them as graph mode (same as `Node`).
    `Graph._apply` consumes the keyword `**inputs` mapping by name.

    The dataclass disables ``__eq__`` so `Input` instances compare by
    identity — necessary for the ``id(...)``-keyed evaluation cache.
    """

    name: str
    parents: tuple[Any, ...] = field(default_factory=tuple)
    operator: Operator | None = None

geotoolz.core._src.graph.Node dataclass

A non-input vertex in a Graph.

Created automatically by Operator.__call__ when any argument is an Input or another Node. Carries the operator and its parents (other Input / Node instances).

Like Input, equality is by identity for the evaluation cache.

Source code in src/geotoolz/core/_src/graph.py
@dataclass(eq=False)
class Node:
    """A non-input vertex in a `Graph`.

    Created automatically by `Operator.__call__` when any argument is an
    `Input` or another `Node`. Carries the operator and its parents
    (other `Input` / `Node` instances).

    Like `Input`, equality is by identity for the evaluation cache.
    """

    operator: Operator
    parents: tuple[Any, ...]

geotoolz.core._src.graph.Graph

Bases: Operator

A symbolic operator graph with multiple inputs and outputs.

Construction is implicit — calling operators on Input / Node instances builds the graph; Graph(inputs=..., outputs=...) wraps the result. _apply(**inputs) evaluates the graph in topological order.

Inherits from Operator so a Graph satisfies the same interface as any other operator. Operator.__call__ dispatches keyword args straight through to Graph._apply; positional args are unused.

Parameters:

Name Type Description Default
inputs dict[str, Input]

Map of input-name → Input placeholders. The keys are the keyword names accepted by __call__.

required
outputs dict[str, Node | Input]

Map of output-name → Node (or Input, if the output is a direct passthrough). The keys are the keys of the returned dict.

required

Examples:

Two-input, two-output graph::

img = Input("image")
ref = Input("reference")
ndvi = NDVI(red_idx=2, nir_idx=3)(img)
rmse = RMSE(axis=(-2, -1))(ndvi, ref)

g = Graph(
    inputs={"image": img, "reference": ref},
    outputs={"ndvi": ndvi, "rmse": rmse},
)
result = g(image=img_gt, reference=ref_gt)
# {"ndvi": GeoTensor, "rmse": scalar}

Raises:

Type Description
ValueError

if the graph contains a cycle, if an output node isn't reachable from any input, or if an Input referenced by a node isn't declared in inputs.

Source code in src/geotoolz/core/_src/graph.py
class Graph(Operator):
    """A symbolic operator graph with multiple inputs and outputs.

    Construction is implicit — calling operators on `Input` / `Node`
    instances builds the graph; ``Graph(inputs=..., outputs=...)`` wraps
    the result. ``_apply(**inputs)`` evaluates the graph in topological
    order.

    Inherits from `Operator` so a `Graph` satisfies the same interface
    as any other operator. ``Operator.__call__`` dispatches keyword args
    straight through to ``Graph._apply``; positional args are unused.

    Args:
        inputs: Map of ``input-name → Input`` placeholders. The keys are
            the keyword names accepted by ``__call__``.
        outputs: Map of ``output-name → Node`` (or ``Input``, if the
            output is a direct passthrough). The keys are the keys of the
            returned dict.

    Examples:
        Two-input, two-output graph::

            img = Input("image")
            ref = Input("reference")
            ndvi = NDVI(red_idx=2, nir_idx=3)(img)
            rmse = RMSE(axis=(-2, -1))(ndvi, ref)

            g = Graph(
                inputs={"image": img, "reference": ref},
                outputs={"ndvi": ndvi, "rmse": rmse},
            )
            result = g(image=img_gt, reference=ref_gt)
            # {"ndvi": GeoTensor, "rmse": scalar}

    Raises:
        ValueError: if the graph contains a cycle, if an output node
            isn't reachable from any input, or if an `Input` referenced
            by a node isn't declared in ``inputs``.
    """

    def __init__(
        self,
        inputs: dict[str, Input],
        outputs: dict[str, Node | Input],
    ) -> None:
        self.inputs = inputs
        self.outputs = outputs
        self._order = self._topological_sort()

    def _topological_sort(self) -> list[Node]:
        """Return a topological ordering of internal `Node`s.

        Inputs are not included (they are supplied by the caller, not
        computed). Output `Input`s aren't either — they pass straight
        through.
        """
        declared_inputs = set(map(id, self.inputs.values()))
        order: list[Node] = []
        visited: set[int] = set()
        on_stack: set[int] = set()

        def visit(node: Input | Node) -> None:
            node_id = id(node)
            if node_id in visited:
                return
            if node_id in on_stack:
                raise ValueError(
                    "Cycle detected in graph — operator graphs must be DAGs."
                )
            on_stack.add(node_id)
            for parent in node.parents:
                visit(parent)
            on_stack.discard(node_id)
            visited.add(node_id)
            # Inputs are sources, not work to do during _apply.
            if isinstance(node, Input):
                if node_id not in declared_inputs:
                    raise ValueError(
                        f"Input {node.name!r} is referenced by an output but "
                        f"not declared in `inputs=`."
                    )
                return
            order.append(node)

        for output in self.outputs.values():
            visit(output)
        return order

    def _apply(self, *args: Carrier, **inputs: Carrier) -> dict[str, Any]:
        """Evaluate the graph with the supplied inputs.

        Accepts inputs either positionally (bound to declared `Input`s in
        construction order) or by keyword. The positional form makes
        single-input graphs compose with `Sequential` and lets `Graph`s
        nest inside other `Graph`s — both shapes route values through
        `Operator.__call__`, which only knows how to splat positionally.

        Args:
            *args: One value per declared `Input`, in declaration order.
                Mutually exclusive with ``**inputs``.
            **inputs: One value per declared `Input`, keyed by name.

        Returns:
            ``{output-name: result}`` for each declared output.
        """
        if args and inputs:
            raise TypeError(
                "Graph._apply accepts either positional args (bound to inputs "
                "in declaration order) or keyword inputs, not both."
            )
        if args:
            if len(args) != len(self.inputs):
                raise TypeError(
                    f"Graph expected {len(self.inputs)} positional argument(s) "
                    f"to bind to inputs {list(self.inputs)}, got {len(args)}."
                )
            inputs = dict(zip(self.inputs, args, strict=True))

        missing = set(self.inputs) - set(inputs)
        if missing:
            raise ValueError(f"Graph missing required input(s): {sorted(missing)}")

        cache: dict[int, Any] = {
            id(self.inputs[name]): inputs[name] for name in self.inputs
        }
        for node in self._order:
            node_args = tuple(cache[id(p)] for p in node.parents)
            # Route through __call__ so nested operators (Graph, Sequential)
            # get their own dispatch, not just bare _apply.
            cache[id(node)] = node.operator(*node_args)

        return {name: cache[id(node)] for name, node in self.outputs.items()}

    def get_config(self) -> dict[str, Any]:
        """Best-effort config — node operators' configs, by output name.

        Graphs are inherently runtime-defined (the topology comes from
        Python object identity), so the config is a debug repr rather than
        a faithful YAML round-trip. Future YAML support would store the
        topology as a list of (op, parent-keys) records.
        """
        return {
            "inputs": list(self.inputs),
            "outputs": {
                name: {
                    "class": type(node.operator).__name__
                    if isinstance(node, Node)
                    else "Input",
                    "config": node.operator.get_config()
                    if isinstance(node, Node)
                    else {},
                }
                for name, node in self.outputs.items()
            },
        }

    def __repr__(self) -> str:
        ins = ", ".join(self.inputs)
        outs = ", ".join(self.outputs)
        return f"Graph(inputs=[{ins}], outputs=[{outs}])"

get_config() -> dict[str, Any]

Best-effort config — node operators' configs, by output name.

Graphs are inherently runtime-defined (the topology comes from Python object identity), so the config is a debug repr rather than a faithful YAML round-trip. Future YAML support would store the topology as a list of (op, parent-keys) records.

Source code in src/geotoolz/core/_src/graph.py
def get_config(self) -> dict[str, Any]:
    """Best-effort config — node operators' configs, by output name.

    Graphs are inherently runtime-defined (the topology comes from
    Python object identity), so the config is a debug repr rather than
    a faithful YAML round-trip. Future YAML support would store the
    topology as a list of (op, parent-keys) records.
    """
    return {
        "inputs": list(self.inputs),
        "outputs": {
            name: {
                "class": type(node.operator).__name__
                if isinstance(node, Node)
                else "Input",
                "config": node.operator.get_config()
                if isinstance(node, Node)
                else {},
            }
            for name, node in self.outputs.items()
        },
    }

geotoolz.core._src.composition.Fanout

Bases: Operator

One input → dict of outputs (sugar over Graph).

Each branch is applied to the same input GeoTensor; the outputs are returned as a dict keyed by the branch name.

Parameters:

Name Type Description Default
branches dict[str, Operator]

Map of output-name → Operator. Each operator receives the same input independently and contributes one entry to the returned dict.

required

Examples:

Compute three indices from one scene with one read::

products = Fanout({
    "ndvi": NDVI(red_idx=2, nir_idx=3),
    "ndwi": NDWI(green_idx=1, nir_idx=3),
    "rgb":  S2_L2A_RGB(),
})(gt)
# {"ndvi": GeoTensor, "ndwi": GeoTensor, "rgb": GeoTensor}

Equivalent Graph form (more verbose, identical result)::

img = Input("image")
g = Graph(
    inputs={"image": img},
    outputs={
        "ndvi": NDVI(red_idx=2, nir_idx=3)(img),
        "ndwi": NDWI(green_idx=1, nir_idx=3)(img),
        "rgb":  S2_L2A_RGB()(img),
    },
)

Raises:

Type Description
TypeError

if any branch is not an Operator, or if no branches are provided.

Source code in src/geotoolz/core/_src/composition.py
class Fanout(Operator):
    """One input → dict of outputs (sugar over `Graph`).

    Each branch is applied to the same input GeoTensor; the outputs are
    returned as a dict keyed by the branch name.

    Args:
        branches: Map of ``output-name → Operator``. Each operator
            receives the same input independently and contributes one
            entry to the returned dict.

    Examples:
        Compute three indices from one scene with one read::

            products = Fanout({
                "ndvi": NDVI(red_idx=2, nir_idx=3),
                "ndwi": NDWI(green_idx=1, nir_idx=3),
                "rgb":  S2_L2A_RGB(),
            })(gt)
            # {"ndvi": GeoTensor, "ndwi": GeoTensor, "rgb": GeoTensor}

        Equivalent `Graph` form (more verbose, identical result)::

            img = Input("image")
            g = Graph(
                inputs={"image": img},
                outputs={
                    "ndvi": NDVI(red_idx=2, nir_idx=3)(img),
                    "ndwi": NDWI(green_idx=1, nir_idx=3)(img),
                    "rgb":  S2_L2A_RGB()(img),
                },
            )

    Raises:
        TypeError: if any branch is not an `Operator`, or if no branches
            are provided.
    """

    def __init__(self, branches: dict[str, Operator]) -> None:
        if not branches:
            raise TypeError("Fanout requires at least one branch.")
        for name, op in branches.items():
            if not isinstance(op, Operator):
                raise TypeError(
                    f"Fanout branch {name!r} is {type(op).__name__}, expected Operator."
                )
        self.branches = dict(branches)

    def _apply(self, gt: Carrier) -> dict[str, Any]:
        return {name: op(gt) for name, op in self.branches.items()}

    def get_config(self) -> dict[str, Any]:
        return {
            "branches": {
                name: {
                    "class": type(op).__name__,
                    "config": op.get_config(),
                }
                for name, op in self.branches.items()
            }
        }

Inference

geotoolz.core._src.model.ModelOp

Bases: Operator

Wrap any callable as an Operator.

Materialises the GeoTensor to a plain np.ndarray (via np.asarray) before handing it to the model — frameworks that strip the subclass (torch, JAX, sklearn) don't care, and frameworks that preserve it (numpy proper) still see something sensible.

Parameters:

Name Type Description Default
model Any

Any object that can be called as model(arr) or whose method attribute can be called as model.predict(arr). No isinstance / framework imports.

required
method str

Method name to invoke on model. Default "__call__" — equivalent to model(arr). Set to "predict" for sklearn estimators.

'__call__'
batch_size int | None

If set, split the input along axis 0 into chunks of this size, call the model once per chunk, concatenate the results along axis 0. Useful when the model can't fit the whole input in GPU memory.

None
Note

forbid_in_yaml = True — the model is a runtime object and won't round-trip to YAML. Users typically pin a model artifact (state-dict + class config) themselves.

Examples:

Inference with a sklearn classifier::

op = ModelOp(rf_clf, method="predict")
preds = op(features_gt)

Batched inference with a torch model::

op = ModelOp(unet_model, batch_size=8)
preds = op(chips_gt)  # iterates 8 chips at a time
Source code in src/geotoolz/core/_src/model.py
class ModelOp(Operator):
    """Wrap any callable as an Operator.

    Materialises the GeoTensor to a plain ``np.ndarray`` (via
    ``np.asarray``) before handing it to the model — frameworks that
    strip the subclass (torch, JAX, sklearn) don't care, and frameworks
    that preserve it (numpy proper) still see something sensible.

    Args:
        model: Any object that can be called as ``model(arr)`` or whose
            ``method`` attribute can be called as
            ``model.predict(arr)``. No isinstance / framework imports.
        method: Method name to invoke on ``model``. Default
            ``"__call__"`` — equivalent to ``model(arr)``. Set to
            ``"predict"`` for sklearn estimators.
        batch_size: If set, split the input along axis 0 into chunks of
            this size, call the model once per chunk, concatenate the
            results along axis 0. Useful when the model can't fit the
            whole input in GPU memory.

    Note:
        ``forbid_in_yaml = True`` — the model is a runtime object and
        won't round-trip to YAML. Users typically pin a model artifact
        (state-dict + class config) themselves.

    Examples:
        Inference with a sklearn classifier::

            op = ModelOp(rf_clf, method="predict")
            preds = op(features_gt)

        Batched inference with a torch model::

            op = ModelOp(unet_model, batch_size=8)
            preds = op(chips_gt)  # iterates 8 chips at a time
    """

    forbid_in_yaml: ClassVar[bool] = True

    def __init__(
        self,
        model: Any,
        *,
        method: str = "__call__",
        batch_size: int | None = None,
    ) -> None:
        self.model = model
        self.method = method
        self.batch_size = batch_size

    def _resolve_callable(self) -> Any:
        if self.method == "__call__":
            return self.model
        return getattr(self.model, self.method)

    def _apply(self, gt: Carrier) -> Any:
        arr = np.asarray(gt)
        fn = self._resolve_callable()
        if self.batch_size is None:
            return fn(arr)
        return self._batched(fn, arr)

    def _batched(self, fn: Any, arr: np.ndarray) -> np.ndarray:
        """Split ``arr`` along axis 0, call ``fn`` per chunk, concatenate.

        Plain ``np.concatenate`` along axis 0 — works when the model's
        output preserves the batch dimension (the common case).

        Empty inputs (``arr.shape[0] == 0``) are passed straight to the
        model in one call: ``np.concatenate`` cannot accept an empty list
        of chunks, and the model is free to return a meaningful
        zero-length result.
        """
        n = arr.shape[0]
        if n == 0:
            return fn(arr)
        chunks: list[Any] = []
        bs = int(self.batch_size or n)
        for start in range(0, n, bs):
            chunks.append(fn(arr[start : start + bs]))
        return np.concatenate(chunks, axis=0)

    def get_config(self) -> dict[str, Any]:
        return {
            "model_type": type(self.model).__name__,
            "method": self.method,
            "batch_size": self.batch_size,
        }

Observers — identity with side effects

geotoolz.core._src.observers.Tap

Bases: Operator

Identity operator with a side effect.

Calls fn(gt) and returns gt unchanged. The return value of fn is ignored — Tap is for side effects, not transforms. If you want to transform, use Lambda or write a real Operator.

forbid_in_yaml = True because the callback closure can't round-trip.

Parameters:

Name Type Description Default
fn Callable[[Carrier], Any]

A callable (gt) -> Any invoked for its side effect.

required
name str

Display name for repr() / provenance. Default "tap".

'tap'

Examples:

Log NaN fraction between steps::

Sequential([
    MaskClouds(...),
    Tap(lambda gt: print(f"NaN: {np.isnan(gt).mean():.1%}")),
    NDVI(...),
])
Source code in src/geotoolz/core/_src/observers.py
class Tap(Operator):
    """Identity operator with a side effect.

    Calls ``fn(gt)`` and returns ``gt`` unchanged. The return value of
    ``fn`` is ignored — `Tap` is for side effects, not transforms. If
    you want to transform, use `Lambda` or write a real `Operator`.

    ``forbid_in_yaml = True`` because the callback closure can't
    round-trip.

    Args:
        fn: A callable ``(gt) -> Any`` invoked for its side effect.
        name: Display name for ``repr()`` / provenance. Default ``"tap"``.

    Examples:
        Log NaN fraction between steps::

            Sequential([
                MaskClouds(...),
                Tap(lambda gt: print(f"NaN: {np.isnan(gt).mean():.1%}")),
                NDVI(...),
            ])
    """

    forbid_in_yaml: ClassVar[bool] = True

    def __init__(self, fn: Callable[[Carrier], Any], *, name: str = "tap") -> None:
        self.fn = fn
        self.name = name

    def _apply(self, gt: _C) -> _C:
        self.fn(gt)
        return gt

    def get_config(self) -> dict[str, Any]:
        return {"name": self.name}

geotoolz.core._src.observers.Snapshot

Controller that produces snapshot-taking operators.

Not an Operator itself — Snapshot.at(key) returns the operator you drop into a Sequential. After the pipeline runs, every named intermediate is available via snap[key].

Stores references, not copies — if a downstream op mutates the array in place, your snapshot sees the mutation too. Add explicit copies in exploratory work if needed.

Examples:

::

snap = Snapshot()
pipe = Sequential([
    op1, snap.at("after_op1"),
    op2, snap.at("after_op2"),
])
pipe(gt)
print(snap.keys())             # dict_keys(["after_op1", "after_op2"])
inspect = snap["after_op1"]
Source code in src/geotoolz/core/_src/observers.py
class Snapshot:
    """Controller that produces snapshot-taking operators.

    Not an `Operator` itself — `Snapshot.at(key)` returns the operator
    you drop into a `Sequential`. After the pipeline runs, every named
    intermediate is available via ``snap[key]``.

    Stores *references*, not copies — if a downstream op mutates the
    array in place, your snapshot sees the mutation too. Add explicit
    copies in exploratory work if needed.

    Examples:
        ::

            snap = Snapshot()
            pipe = Sequential([
                op1, snap.at("after_op1"),
                op2, snap.at("after_op2"),
            ])
            pipe(gt)
            print(snap.keys())             # dict_keys(["after_op1", "after_op2"])
            inspect = snap["after_op1"]
    """

    def __init__(self) -> None:
        self._store: dict[str, Any] = {}

    def at(self, key: str) -> _SnapshotTap:
        """Return an operator that captures the GeoTensor under ``key``."""
        return _SnapshotTap(self._store, key)

    def __getitem__(self, key: str) -> Any:
        return self._store[key]

    def __contains__(self, key: str) -> bool:
        return key in self._store

    def keys(self):
        return self._store.keys()

    def items(self):
        return self._store.items()

    def clear(self) -> None:
        self._store.clear()

at(key: str) -> _SnapshotTap

Return an operator that captures the GeoTensor under key.

Source code in src/geotoolz/core/_src/observers.py
def at(self, key: str) -> _SnapshotTap:
    """Return an operator that captures the GeoTensor under ``key``."""
    return _SnapshotTap(self._store, key)

geotoolz.core._src.observers.ShapeTrace

Bases: Operator

Log shape, dtype, CRS, transform at every step.

Drop one between steps of a Sequential to see what's happening to the carrier. Optional mode="diff_only" skips lines that don't change anything from the previous trace.

Parameters:

Name Type Description Default
printer Callable[[str], Any]

Callable used to print each line. Default builtins.print. Override with log.info etc.

print
mode str

"every" (default) logs every call; "diff_only" suppresses output when nothing changed since the last call.

'every'

Examples:

::

Sequential([
    ShapeTrace(),
    op1,
    ShapeTrace(),
    op2,
    ShapeTrace(),
])(gt)
Source code in src/geotoolz/core/_src/observers.py
class ShapeTrace(Operator):
    """Log shape, dtype, CRS, transform at every step.

    Drop one between steps of a `Sequential` to see what's happening to
    the carrier. Optional ``mode="diff_only"`` skips lines that don't
    change anything from the previous trace.

    Args:
        printer: Callable used to print each line. Default
            ``builtins.print``. Override with ``log.info`` etc.
        mode: ``"every"`` (default) logs every call; ``"diff_only"``
            suppresses output when nothing changed since the last call.

    Examples:
        ::

            Sequential([
                ShapeTrace(),
                op1,
                ShapeTrace(),
                op2,
                ShapeTrace(),
            ])(gt)
    """

    _MODES: ClassVar[tuple[str, ...]] = ("every", "diff_only")

    def __init__(
        self,
        *,
        printer: Callable[[str], Any] = print,
        mode: str = "every",
    ) -> None:
        if mode not in self._MODES:
            raise ValueError(f"mode must be one of {self._MODES}, got {mode!r}")
        self._printer = printer
        self.mode = mode
        self._last_line: str | None = None

    def _describe(self, gt: Carrier) -> str:
        shape = getattr(gt, "shape", None)
        dtype = getattr(gt, "dtype", None)
        crs = getattr(gt, "crs", None)
        return f"shape={shape} dtype={dtype} crs={crs}"

    def _apply(self, gt: _C) -> _C:
        line = self._describe(gt)
        if self.mode == "diff_only" and line == self._last_line:
            return gt
        self._printer(line)
        self._last_line = line
        return gt

    def get_config(self) -> dict[str, Any]:
        return {"mode": self.mode}

Control flow

geotoolz.core._src.control.Branch

Bases: Operator

Apply if_true when predicate(gt) is truthy, else if_false.

Parameters:

Name Type Description Default
predicate Callable[[Carrier], bool]

Callable (gt) -> bool.

required
if_true Operator

Operator applied when the predicate returns truthy.

required
if_false Operator | None

Operator applied otherwise. Default Identity().

None

Examples:

Only correct atmospherically if the scene is reasonably clear::

Sequential([
    Branch(
        predicate=lambda gt: cloud_fraction(gt) < 0.3,
        if_true=TOAToBOA(...),
        if_false=Identity(),
    ),
    NDVI(...),
])
Source code in src/geotoolz/core/_src/control.py
class Branch(Operator):
    """Apply ``if_true`` when ``predicate(gt)`` is truthy, else
    ``if_false``.

    Args:
        predicate: Callable ``(gt) -> bool``.
        if_true: Operator applied when the predicate returns truthy.
        if_false: Operator applied otherwise. Default `Identity()`.

    Examples:
        Only correct atmospherically if the scene is reasonably clear::

            Sequential([
                Branch(
                    predicate=lambda gt: cloud_fraction(gt) < 0.3,
                    if_true=TOAToBOA(...),
                    if_false=Identity(),
                ),
                NDVI(...),
            ])
    """

    forbid_in_yaml: ClassVar[bool] = True

    def __init__(
        self,
        *,
        predicate: Callable[[Carrier], bool],
        if_true: Operator,
        if_false: Operator | None = None,
    ) -> None:
        if not isinstance(if_true, Operator):
            raise TypeError(
                f"if_true must be an Operator, got {type(if_true).__name__}."
            )
        if if_false is not None and not isinstance(if_false, Operator):
            raise TypeError(
                f"if_false must be an Operator or None, got {type(if_false).__name__}."
            )
        self.predicate = predicate
        self.if_true = if_true
        self.if_false = if_false if if_false is not None else Identity()

    def _apply(self, gt: Carrier) -> Any:
        # Output type is ``Any`` rather than ``Carrier``: each arm is an
        # arbitrary Operator and may legitimately transform the carrier.
        if self.predicate(gt):
            return self.if_true(gt)
        return self.if_false(gt)

    def get_config(self) -> dict[str, Any]:
        return {
            "predicate": getattr(self.predicate, "__name__", repr(self.predicate)),
            "if_true": {
                "class": type(self.if_true).__name__,
                "config": self.if_true.get_config(),
            },
            "if_false": {
                "class": type(self.if_false).__name__,
                "config": self.if_false.get_config(),
            },
        }

geotoolz.core._src.control.Switch

Bases: Operator

Multi-way dispatch on key(gt).

Computes k = key(gt), then runs cases[k](gt). If k is not in cases, runs default(gt).

Parameters:

Name Type Description Default
key Callable[[Carrier], Any]

Callable (gt) -> hashable whose return value selects a case. Common patterns: lambda gt: gt.metadata["sensor"], lambda gt: gt.dtype.name.

required
cases dict[Any, Operator]

Map of key-value → Operator.

required
default Operator | None

Operator applied when no case matches. Default Identity() (silent passthrough). Pass a custom operator that raises if you want strict-mode behaviour.

None

Examples:

Cross-sensor pipeline::

Switch(
    key=lambda gt: gt.metadata["sensor"],
    cases={
        "S2":      S2_L2A_NDVI(),
        "Landsat": L8_BOA_NDVI(),
    },
)
Source code in src/geotoolz/core/_src/control.py
class Switch(Operator):
    """Multi-way dispatch on ``key(gt)``.

    Computes ``k = key(gt)``, then runs ``cases[k](gt)``. If ``k`` is not
    in ``cases``, runs ``default(gt)``.

    Args:
        key: Callable ``(gt) -> hashable`` whose return value selects a
            case. Common patterns: ``lambda gt: gt.metadata["sensor"]``,
            ``lambda gt: gt.dtype.name``.
        cases: Map of ``key-value → Operator``.
        default: Operator applied when no case matches. Default
            `Identity()` (silent passthrough). Pass a custom operator
            that raises if you want strict-mode behaviour.

    Examples:
        Cross-sensor pipeline::

            Switch(
                key=lambda gt: gt.metadata["sensor"],
                cases={
                    "S2":      S2_L2A_NDVI(),
                    "Landsat": L8_BOA_NDVI(),
                },
            )
    """

    forbid_in_yaml: ClassVar[bool] = True

    def __init__(
        self,
        *,
        key: Callable[[Carrier], Any],
        cases: dict[Any, Operator],
        default: Operator | None = None,
    ) -> None:
        for k, op in cases.items():
            if not isinstance(op, Operator):
                raise TypeError(
                    f"Switch case {k!r} is {type(op).__name__}, expected Operator."
                )
        if default is not None and not isinstance(default, Operator):
            raise TypeError(
                f"default must be an Operator or None, got {type(default).__name__}."
            )
        self.key = key
        self.cases = dict(cases)
        self.default = default if default is not None else Identity()

    def _apply(self, gt: Carrier) -> Any:
        k = self.key(gt)
        op = self.cases.get(k, self.default)
        return op(gt)

    def get_config(self) -> dict[str, Any]:
        return {
            "key": getattr(self.key, "__name__", repr(self.key)),
            "cases": {
                str(k): {
                    "class": type(op).__name__,
                    "config": op.get_config(),
                }
                for k, op in self.cases.items()
            },
            "default": {
                "class": type(self.default).__name__,
                "config": self.default.get_config(),
            },
        }

Building blocks

geotoolz.core._src.building_blocks.Identity

Bases: Operator

Explicit no-op. Returns its input unchanged.

Use as a default arm in Branch / Switch rather than passing None — it serialises, composes, shows up in repr(), and makes pipeline structure self-documenting.

Examples:

Branch(predicate=is_clean, if_true=Identity(), if_false=cleanup)

Source code in src/geotoolz/core/_src/building_blocks.py
class Identity(Operator):
    """Explicit no-op. Returns its input unchanged.

    Use as a default arm in ``Branch`` / ``Switch`` rather than passing
    ``None`` — it serialises, composes, shows up in ``repr()``, and makes
    pipeline structure self-documenting.

    Examples:
        ``Branch(predicate=is_clean, if_true=Identity(), if_false=cleanup)``
    """

    def _apply(self, gt: _C) -> _C:
        return gt

geotoolz.core._src.building_blocks.Const

Bases: Operator

Return a fixed value regardless of input.

Useful for golden test fixtures, as a synthetic source in a Switch default, or anywhere the pipeline needs a stand-in.

Parameters:

Name Type Description Default
value Any

The value to return on every call. Typically a GeoTensor for in-pipeline use; can be any object for test scaffolding.

required

Examples:

Build a deterministic test pipeline::

test_pipeline = Sequential([
    Const(synthetic_gt),    # ignores input
    real_pipeline,
])
Source code in src/geotoolz/core/_src/building_blocks.py
class Const(Operator):
    """Return a fixed value regardless of input.

    Useful for golden test fixtures, as a synthetic source in a
    ``Switch`` default, or anywhere the pipeline needs a stand-in.

    Args:
        value: The value to return on every call. Typically a `GeoTensor`
            for in-pipeline use; can be any object for test scaffolding.

    Examples:
        Build a deterministic test pipeline::

            test_pipeline = Sequential([
                Const(synthetic_gt),    # ignores input
                real_pipeline,
            ])
    """

    def __init__(self, value: Any) -> None:
        self.value = value

    def _apply(self, _: Carrier = None) -> Any:
        return self.value

    def get_config(self) -> dict[str, Any]:
        # Best-effort debug repr — the actual value may not be JSON-safe.
        return {
            "value_type": type(self.value).__name__,
            "value_shape": getattr(self.value, "shape", None),
        }

geotoolz.core._src.building_blocks.Lambda

Bases: Operator

Inline-function escape hatch.

Holds a user callable and applies it. The callable's signature is fn(gt) -> result — same contract as Operator._apply. Use when writing a full Operator subclass would be overkill for a one-off transform.

forbid_in_yaml = True — the closure cannot round-trip to YAML faithfully. get_config() returns a debug repr only. The first time a Lambda recurs in your code, promote it to a real Operator subclass.

Parameters:

Name Type Description Default
fn Callable[[Any], Any]

A callable (input) -> output.

required
name str

Display name for repr() and provenance. Defaults to "lambda".

'lambda'

Examples:

Lambda(lambda gt: gt * 0.0001, name="scale_to_reflectance")

Source code in src/geotoolz/core/_src/building_blocks.py
class Lambda(Operator):
    """Inline-function escape hatch.

    Holds a user callable and applies it. The callable's signature is
    `fn(gt) -> result` — same contract as `Operator._apply`. Use when
    writing a full `Operator` subclass would be overkill for a one-off
    transform.

    ``forbid_in_yaml = True`` — the closure cannot round-trip to YAML
    faithfully. ``get_config()`` returns a debug repr only. The first
    time a `Lambda` recurs in your code, promote it to a real `Operator`
    subclass.

    Args:
        fn: A callable `(input) -> output`.
        name: Display name for ``repr()`` and provenance. Defaults to
            ``"lambda"``.

    Examples:
        ``Lambda(lambda gt: gt * 0.0001, name="scale_to_reflectance")``
    """

    forbid_in_yaml: ClassVar[bool] = True

    def __init__(self, fn: Callable[[Any], Any], *, name: str = "lambda") -> None:
        self.fn = fn
        self.name = name

    def _apply(self, gt: Any) -> Any:
        return self.fn(gt)

    def get_config(self) -> dict[str, Any]:
        return {"name": self.name}

geotoolz.core._src.building_blocks.Sink

Bases: Operator

Composable terminal write — performs a side effect and returns the input unchanged.

Unlike a write op that returns None and breaks the pipe, Sink(write_fn) keeps the GeoTensor flowing. Useful for checkpointing long pipelines, debugging ("what did step 3 produce?"), and branching analysis (write an intermediate, continue with the final product).

forbid_in_yaml = True — the write callable is a closure.

Parameters:

Name Type Description Default
write_fn Callable[[Any], Any]

A callable (gt) -> Any whose return value is ignored. Typical use: lambda gt: georeader.save_cog(gt, "/path.tif").

required
name str

Display name for repr() / provenance. Default "sink".

'sink'

Examples:

Sink(lambda gt: georeader.save_cog(gt, "/intermediate.tif"))

Source code in src/geotoolz/core/_src/building_blocks.py
class Sink(Operator):
    """Composable terminal write — performs a side effect and returns
    the input unchanged.

    Unlike a write op that returns `None` and breaks the pipe,
    ``Sink(write_fn)`` keeps the GeoTensor flowing. Useful for
    checkpointing long pipelines, debugging ("what did step 3 produce?"),
    and branching analysis (write an intermediate, continue with the
    final product).

    ``forbid_in_yaml = True`` — the write callable is a closure.

    Args:
        write_fn: A callable `(gt) -> Any` whose return value is ignored.
            Typical use: ``lambda gt: georeader.save_cog(gt, "/path.tif")``.
        name: Display name for ``repr()`` / provenance. Default
            ``"sink"``.

    Examples:
        ``Sink(lambda gt: georeader.save_cog(gt, "/intermediate.tif"))``
    """

    forbid_in_yaml: ClassVar[bool] = True

    def __init__(
        self,
        write_fn: Callable[[Any], Any],
        *,
        name: str = "sink",
    ) -> None:
        self.write_fn = write_fn
        self.name = name

    def _apply(self, gt: _C) -> _C:
        self.write_fn(gt)
        return gt

    def get_config(self) -> dict[str, Any]:
        return {"name": self.name}