Skip to content

Commit f18b1d0

Browse files
authored
Merge pull request #202 from kwmcbride/kevin/dev/divider
Kevin/dev/divider
2 parents b84bfda + 7d15213 commit f18b1d0

3 files changed

Lines changed: 522 additions & 0 deletions

File tree

src/pathsim/blocks/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from .differentiator import *
22
from .integrator import *
33
from .multiplier import *
4+
from .divider import *
45
from .converters import *
56
from .comparator import *
67
from .samplehold import *

src/pathsim/blocks/divider.py

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
#########################################################################################
2+
##
3+
## REDUCTION BLOCKS (blocks/divider.py)
4+
##
5+
## This module defines static 'Divider' block
6+
##
7+
#########################################################################################
8+
9+
# IMPORTS ===============================================================================
10+
11+
import numpy as np
12+
13+
from math import prod
14+
15+
from ._block import Block
16+
from ..utils.register import Register
17+
from ..optim.operator import Operator
18+
19+
20+
# MISO BLOCKS ===========================================================================
21+
22+
_ZERO_DIV_OPTIONS = ("warn", "raise", "clamp")
23+
24+
25+
class Divider(Block):
26+
"""Multiplies and divides input signals (MISO).
27+
28+
This is the default behavior (multiply all):
29+
30+
.. math::
31+
32+
y(t) = \\prod_i u_i(t)
33+
34+
and this is the behavior with an operations string:
35+
36+
.. math::
37+
38+
y(t) = \\frac{\\prod_{i \\in M} u_i(t)}{\\prod_{j \\in D} u_j(t)}
39+
40+
where :math:`M` is the set of inputs with ``*`` and :math:`D` the set with ``/``.
41+
42+
43+
Example
44+
-------
45+
Default initialization multiplies the first input and divides by the second:
46+
47+
.. code-block:: python
48+
49+
D = Divider()
50+
51+
Multiply the first two inputs and divide by the third:
52+
53+
.. code-block:: python
54+
55+
D = Divider('**/')
56+
57+
Raise an error instead of producing ``inf`` when a denominator input is zero:
58+
59+
.. code-block:: python
60+
61+
D = Divider('**/', zero_div='raise')
62+
63+
Clamp the denominator to machine epsilon so the output stays finite:
64+
65+
.. code-block:: python
66+
67+
D = Divider('**/', zero_div='clamp')
68+
69+
70+
Note
71+
----
72+
This block is purely algebraic and its operation (``op_alg``) will be called
73+
multiple times per timestep, each time when ``Simulation._update(t)`` is
74+
called in the global simulation loop.
75+
76+
77+
Parameters
78+
----------
79+
operations : str, optional
80+
String of ``*`` and ``/`` characters indicating which inputs are
81+
multiplied (``*``) or divided (``/``). Inputs beyond the length of
82+
the string default to ``*``. Defaults to ``'*/'`` (divide second
83+
input by first).
84+
zero_div : str, optional
85+
Behaviour when a denominator input is zero. One of:
86+
87+
``'warn'`` *(default)*
88+
Propagates ``inf`` and emits a ``RuntimeWarning`` — numpy's
89+
standard behaviour.
90+
``'raise'``
91+
Raises ``ZeroDivisionError``.
92+
``'clamp'``
93+
Clamps the denominator magnitude to machine epsilon
94+
(``numpy.finfo(float).eps``), preserving sign, so the output
95+
stays large-but-finite rather than ``inf``.
96+
97+
98+
Attributes
99+
----------
100+
_ops : dict
101+
Maps operation characters to exponent values (``+1`` or ``-1``).
102+
_ops_array : numpy.ndarray
103+
Exponents (+1 for ``*``, -1 for ``/``) converted to an array.
104+
op_alg : Operator
105+
Internal algebraic operator.
106+
"""
107+
108+
input_port_labels = None
109+
output_port_labels = {"out": 0}
110+
111+
def __init__(self, operations="*/", zero_div="warn"):
112+
super().__init__()
113+
114+
# validate zero_div
115+
if zero_div not in _ZERO_DIV_OPTIONS:
116+
raise ValueError(
117+
f"'zero_div' must be one of {_ZERO_DIV_OPTIONS}, got '{zero_div}'"
118+
)
119+
self.zero_div = zero_div
120+
121+
# allowed arithmetic operations mapped to exponents
122+
self._ops = {"*": 1, "/": -1}
123+
self.operations = operations
124+
125+
if self.operations is None:
126+
127+
# Default: multiply all inputs — identical to Multiplier
128+
self.op_alg = Operator(
129+
func=prod,
130+
jac=lambda x: np.array([[
131+
prod(np.delete(x, i)) for i in range(len(x))
132+
]])
133+
)
134+
135+
else:
136+
137+
# input validation
138+
if not isinstance(self.operations, str):
139+
raise ValueError("'operations' must be a string or None")
140+
for op in self.operations:
141+
if op not in self._ops:
142+
raise ValueError(
143+
f"operation '{op}' not in {set(self._ops)}"
144+
)
145+
146+
self._ops_array = np.array(
147+
[self._ops[op] for op in self.operations], dtype=float
148+
)
149+
150+
# capture for closures
151+
_ops_array = self._ops_array
152+
_zero_div = zero_div
153+
_eps = np.finfo(float).eps
154+
155+
def _safe_den(d):
156+
"""Apply zero_div policy to a denominator value."""
157+
if d == 0:
158+
if _zero_div == "raise":
159+
raise ZeroDivisionError(
160+
"Divider: denominator is zero. "
161+
"Use zero_div='warn' or 'clamp' to suppress."
162+
)
163+
elif _zero_div == "clamp":
164+
return _eps
165+
return d
166+
167+
def prod_ops(X):
168+
n = len(X)
169+
no = len(_ops_array)
170+
ops = np.ones(n)
171+
ops[:min(n, no)] = _ops_array[:min(n, no)]
172+
num = prod(X[i] for i in range(n) if ops[i] > 0)
173+
den = _safe_den(prod(X[i] for i in range(n) if ops[i] < 0))
174+
return num / den
175+
176+
def jac_ops(X):
177+
n = len(X)
178+
no = len(_ops_array)
179+
ops = np.ones(n)
180+
ops[:min(n, no)] = _ops_array[:min(n, no)]
181+
X = np.asarray(X, dtype=float)
182+
# Apply zero_div policy to all denominator inputs up front so
183+
# both the direct division and the rest-product stay consistent.
184+
X_safe = X.copy()
185+
for i in range(n):
186+
if ops[i] < 0:
187+
X_safe[i] = _safe_den(float(X[i]))
188+
row = []
189+
for k in range(n):
190+
rest = np.prod(
191+
np.power(np.delete(X_safe, k), np.delete(ops, k))
192+
)
193+
if ops[k] > 0: # multiply: dy/du_k = prod of rest
194+
row.append(rest)
195+
else: # divide: dy/du_k = -rest / u_k^2
196+
row.append(-rest / X_safe[k] ** 2)
197+
return np.array([row])
198+
199+
self.op_alg = Operator(func=prod_ops, jac=jac_ops)
200+
201+
202+
def __len__(self):
203+
"""Purely algebraic block."""
204+
return 1
205+
206+
207+
def update(self, t):
208+
"""Update system equation.
209+
210+
Parameters
211+
----------
212+
t : float
213+
Evaluation time.
214+
"""
215+
u = self.inputs.to_array()
216+
self.outputs.update_from_array(self.op_alg(u))

0 commit comments

Comments
 (0)