|
| 1 | +######################################################################################### |
| 2 | +## |
| 3 | +## REDUCTION BLOCKS (blocks/divider.py) |
| 4 | +## |
| 5 | +## This module defines static 'Divider' block |
| 6 | +## |
| 7 | +######################################################################################### |
| 8 | + |
| 9 | +# IMPORTS =============================================================================== |
| 10 | + |
| 11 | +import numpy as np |
| 12 | + |
| 13 | +from math import prod |
| 14 | + |
| 15 | +from ._block import Block |
| 16 | +from ..utils.register import Register |
| 17 | +from ..optim.operator import Operator |
| 18 | + |
| 19 | + |
| 20 | +# MISO BLOCKS =========================================================================== |
| 21 | + |
| 22 | +_ZERO_DIV_OPTIONS = ("warn", "raise", "clamp") |
| 23 | + |
| 24 | + |
| 25 | +class Divider(Block): |
| 26 | + """Multiplies and divides input signals (MISO). |
| 27 | +
|
| 28 | + This is the default behavior (multiply all): |
| 29 | +
|
| 30 | + .. math:: |
| 31 | +
|
| 32 | + y(t) = \\prod_i u_i(t) |
| 33 | +
|
| 34 | + and this is the behavior with an operations string: |
| 35 | +
|
| 36 | + .. math:: |
| 37 | +
|
| 38 | + y(t) = \\frac{\\prod_{i \\in M} u_i(t)}{\\prod_{j \\in D} u_j(t)} |
| 39 | +
|
| 40 | + where :math:`M` is the set of inputs with ``*`` and :math:`D` the set with ``/``. |
| 41 | +
|
| 42 | +
|
| 43 | + Example |
| 44 | + ------- |
| 45 | + Default initialization multiplies the first input and divides by the second: |
| 46 | +
|
| 47 | + .. code-block:: python |
| 48 | +
|
| 49 | + D = Divider() |
| 50 | +
|
| 51 | + Multiply the first two inputs and divide by the third: |
| 52 | +
|
| 53 | + .. code-block:: python |
| 54 | +
|
| 55 | + D = Divider('**/') |
| 56 | +
|
| 57 | + Raise an error instead of producing ``inf`` when a denominator input is zero: |
| 58 | +
|
| 59 | + .. code-block:: python |
| 60 | +
|
| 61 | + D = Divider('**/', zero_div='raise') |
| 62 | +
|
| 63 | + Clamp the denominator to machine epsilon so the output stays finite: |
| 64 | +
|
| 65 | + .. code-block:: python |
| 66 | +
|
| 67 | + D = Divider('**/', zero_div='clamp') |
| 68 | +
|
| 69 | +
|
| 70 | + Note |
| 71 | + ---- |
| 72 | + This block is purely algebraic and its operation (``op_alg``) will be called |
| 73 | + multiple times per timestep, each time when ``Simulation._update(t)`` is |
| 74 | + called in the global simulation loop. |
| 75 | +
|
| 76 | +
|
| 77 | + Parameters |
| 78 | + ---------- |
| 79 | + operations : str, optional |
| 80 | + String of ``*`` and ``/`` characters indicating which inputs are |
| 81 | + multiplied (``*``) or divided (``/``). Inputs beyond the length of |
| 82 | + the string default to ``*``. Defaults to ``'*/'`` (divide second |
| 83 | + input by first). |
| 84 | + zero_div : str, optional |
| 85 | + Behaviour when a denominator input is zero. One of: |
| 86 | +
|
| 87 | + ``'warn'`` *(default)* |
| 88 | + Propagates ``inf`` and emits a ``RuntimeWarning`` — numpy's |
| 89 | + standard behaviour. |
| 90 | + ``'raise'`` |
| 91 | + Raises ``ZeroDivisionError``. |
| 92 | + ``'clamp'`` |
| 93 | + Clamps the denominator magnitude to machine epsilon |
| 94 | + (``numpy.finfo(float).eps``), preserving sign, so the output |
| 95 | + stays large-but-finite rather than ``inf``. |
| 96 | +
|
| 97 | +
|
| 98 | + Attributes |
| 99 | + ---------- |
| 100 | + _ops : dict |
| 101 | + Maps operation characters to exponent values (``+1`` or ``-1``). |
| 102 | + _ops_array : numpy.ndarray |
| 103 | + Exponents (+1 for ``*``, -1 for ``/``) converted to an array. |
| 104 | + op_alg : Operator |
| 105 | + Internal algebraic operator. |
| 106 | + """ |
| 107 | + |
| 108 | + input_port_labels = None |
| 109 | + output_port_labels = {"out": 0} |
| 110 | + |
| 111 | + def __init__(self, operations="*/", zero_div="warn"): |
| 112 | + super().__init__() |
| 113 | + |
| 114 | + # validate zero_div |
| 115 | + if zero_div not in _ZERO_DIV_OPTIONS: |
| 116 | + raise ValueError( |
| 117 | + f"'zero_div' must be one of {_ZERO_DIV_OPTIONS}, got '{zero_div}'" |
| 118 | + ) |
| 119 | + self.zero_div = zero_div |
| 120 | + |
| 121 | + # allowed arithmetic operations mapped to exponents |
| 122 | + self._ops = {"*": 1, "/": -1} |
| 123 | + self.operations = operations |
| 124 | + |
| 125 | + if self.operations is None: |
| 126 | + |
| 127 | + # Default: multiply all inputs — identical to Multiplier |
| 128 | + self.op_alg = Operator( |
| 129 | + func=prod, |
| 130 | + jac=lambda x: np.array([[ |
| 131 | + prod(np.delete(x, i)) for i in range(len(x)) |
| 132 | + ]]) |
| 133 | + ) |
| 134 | + |
| 135 | + else: |
| 136 | + |
| 137 | + # input validation |
| 138 | + if not isinstance(self.operations, str): |
| 139 | + raise ValueError("'operations' must be a string or None") |
| 140 | + for op in self.operations: |
| 141 | + if op not in self._ops: |
| 142 | + raise ValueError( |
| 143 | + f"operation '{op}' not in {set(self._ops)}" |
| 144 | + ) |
| 145 | + |
| 146 | + self._ops_array = np.array( |
| 147 | + [self._ops[op] for op in self.operations], dtype=float |
| 148 | + ) |
| 149 | + |
| 150 | + # capture for closures |
| 151 | + _ops_array = self._ops_array |
| 152 | + _zero_div = zero_div |
| 153 | + _eps = np.finfo(float).eps |
| 154 | + |
| 155 | + def _safe_den(d): |
| 156 | + """Apply zero_div policy to a denominator value.""" |
| 157 | + if d == 0: |
| 158 | + if _zero_div == "raise": |
| 159 | + raise ZeroDivisionError( |
| 160 | + "Divider: denominator is zero. " |
| 161 | + "Use zero_div='warn' or 'clamp' to suppress." |
| 162 | + ) |
| 163 | + elif _zero_div == "clamp": |
| 164 | + return _eps |
| 165 | + return d |
| 166 | + |
| 167 | + def prod_ops(X): |
| 168 | + n = len(X) |
| 169 | + no = len(_ops_array) |
| 170 | + ops = np.ones(n) |
| 171 | + ops[:min(n, no)] = _ops_array[:min(n, no)] |
| 172 | + num = prod(X[i] for i in range(n) if ops[i] > 0) |
| 173 | + den = _safe_den(prod(X[i] for i in range(n) if ops[i] < 0)) |
| 174 | + return num / den |
| 175 | + |
| 176 | + def jac_ops(X): |
| 177 | + n = len(X) |
| 178 | + no = len(_ops_array) |
| 179 | + ops = np.ones(n) |
| 180 | + ops[:min(n, no)] = _ops_array[:min(n, no)] |
| 181 | + X = np.asarray(X, dtype=float) |
| 182 | + # Apply zero_div policy to all denominator inputs up front so |
| 183 | + # both the direct division and the rest-product stay consistent. |
| 184 | + X_safe = X.copy() |
| 185 | + for i in range(n): |
| 186 | + if ops[i] < 0: |
| 187 | + X_safe[i] = _safe_den(float(X[i])) |
| 188 | + row = [] |
| 189 | + for k in range(n): |
| 190 | + rest = np.prod( |
| 191 | + np.power(np.delete(X_safe, k), np.delete(ops, k)) |
| 192 | + ) |
| 193 | + if ops[k] > 0: # multiply: dy/du_k = prod of rest |
| 194 | + row.append(rest) |
| 195 | + else: # divide: dy/du_k = -rest / u_k^2 |
| 196 | + row.append(-rest / X_safe[k] ** 2) |
| 197 | + return np.array([row]) |
| 198 | + |
| 199 | + self.op_alg = Operator(func=prod_ops, jac=jac_ops) |
| 200 | + |
| 201 | + |
| 202 | + def __len__(self): |
| 203 | + """Purely algebraic block.""" |
| 204 | + return 1 |
| 205 | + |
| 206 | + |
| 207 | + def update(self, t): |
| 208 | + """Update system equation. |
| 209 | +
|
| 210 | + Parameters |
| 211 | + ---------- |
| 212 | + t : float |
| 213 | + Evaluation time. |
| 214 | + """ |
| 215 | + u = self.inputs.to_array() |
| 216 | + self.outputs.update_from_array(self.op_alg(u)) |
0 commit comments