Skip to content

Commit 80a3810

Browse files
committed
adding divider block
1 parent b84bfda commit 80a3810

3 files changed

Lines changed: 520 additions & 0 deletions

File tree

src/pathsim/blocks/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from .differentiator import *
22
from .integrator import *
33
from .multiplier import *
4+
from .divider import *
45
from .converters import *
56
from .comparator import *
67
from .samplehold import *

src/pathsim/blocks/divider.py

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
#########################################################################################
2+
##
3+
## REDUCTION BLOCKS (blocks/divider.py)
4+
##
5+
## This module defines static 'Divider' block
6+
##
7+
#########################################################################################
8+
9+
# IMPORTS ===============================================================================
10+
11+
import numpy as np
12+
13+
from math import prod
14+
15+
from ._block import Block
16+
from ..utils.register import Register
17+
from ..optim.operator import Operator
18+
19+
20+
# MISO BLOCKS ===========================================================================
21+
22+
_ZERO_DIV_OPTIONS = ("warn", "raise", "clamp")
23+
24+
25+
class Divider(Block):
26+
"""Multiplies and divides input signals (MISO).
27+
28+
This is the default behavior (multiply all):
29+
30+
.. math::
31+
32+
y(t) = \\prod_i u_i(t)
33+
34+
and this is the behavior with an operations string:
35+
36+
.. math::
37+
38+
y(t) = \\frac{\\prod_{i \\in M} u_i(t)}{\\prod_{j \\in D} u_j(t)}
39+
40+
where :math:`M` is the set of inputs with ``*`` and :math:`D` the set with ``/``.
41+
42+
43+
Example
44+
-------
45+
Default initialization multiplies all inputs (same as :class:`Multiplier`):
46+
47+
.. code-block:: python
48+
49+
D = Divider()
50+
51+
Multiply the first two inputs and divide by the third:
52+
53+
.. code-block:: python
54+
55+
D = Divider('**/')
56+
57+
Raise an error instead of producing ``inf`` when a denominator input is zero:
58+
59+
.. code-block:: python
60+
61+
D = Divider('**/', zero_div='raise')
62+
63+
Clamp the denominator to machine epsilon so the output stays finite:
64+
65+
.. code-block:: python
66+
67+
D = Divider('**/', zero_div='clamp')
68+
69+
70+
Note
71+
----
72+
This block is purely algebraic and its operation (``op_alg``) will be called
73+
multiple times per timestep, each time when ``Simulation._update(t)`` is
74+
called in the global simulation loop.
75+
76+
77+
Parameters
78+
----------
79+
operations : str, optional
80+
String of ``*`` and ``/`` characters indicating which inputs are
81+
multiplied (``*``) or divided (``/``). Inputs beyond the length of
82+
the string default to ``*``. ``None`` multiplies all inputs.
83+
zero_div : str, optional
84+
Behaviour when a denominator input is zero. One of:
85+
86+
``'warn'`` *(default)*
87+
Propagates ``inf`` and emits a ``RuntimeWarning`` — numpy's
88+
standard behaviour.
89+
``'raise'``
90+
Raises ``ZeroDivisionError``.
91+
``'clamp'``
92+
Clamps the denominator magnitude to machine epsilon
93+
(``numpy.finfo(float).eps``), preserving sign, so the output
94+
stays large-but-finite rather than ``inf``.
95+
96+
97+
Attributes
98+
----------
99+
_ops : dict
100+
Maps operation characters to exponent values (``+1`` or ``-1``).
101+
_ops_array : numpy.ndarray
102+
Exponents (+1 for ``*``, -1 for ``/``) converted to an array.
103+
op_alg : Operator
104+
Internal algebraic operator.
105+
"""
106+
107+
input_port_labels = None
108+
output_port_labels = {"out": 0}
109+
110+
def __init__(self, operations=None, zero_div="warn"):
111+
super().__init__()
112+
113+
# validate zero_div
114+
if zero_div not in _ZERO_DIV_OPTIONS:
115+
raise ValueError(
116+
f"'zero_div' must be one of {_ZERO_DIV_OPTIONS}, got '{zero_div}'"
117+
)
118+
self.zero_div = zero_div
119+
120+
# allowed arithmetic operations mapped to exponents
121+
self._ops = {"*": 1, "/": -1}
122+
self.operations = operations
123+
124+
if self.operations is None:
125+
126+
# Default: multiply all inputs — identical to Multiplier
127+
self.op_alg = Operator(
128+
func=prod,
129+
jac=lambda x: np.array([[
130+
prod(np.delete(x, i)) for i in range(len(x))
131+
]])
132+
)
133+
134+
else:
135+
136+
# input validation
137+
if not isinstance(self.operations, str):
138+
raise ValueError("'operations' must be a string or None")
139+
for op in self.operations:
140+
if op not in self._ops:
141+
raise ValueError(
142+
f"operation '{op}' not in {set(self._ops)}"
143+
)
144+
145+
self._ops_array = np.array(
146+
[self._ops[op] for op in self.operations], dtype=float
147+
)
148+
149+
# capture for closures
150+
_ops_array = self._ops_array
151+
_zero_div = zero_div
152+
_eps = np.finfo(float).eps
153+
154+
def _safe_den(d):
155+
"""Apply zero_div policy to a denominator value."""
156+
if d == 0:
157+
if _zero_div == "raise":
158+
raise ZeroDivisionError(
159+
"Divider: denominator is zero. "
160+
"Use zero_div='warn' or 'clamp' to suppress."
161+
)
162+
elif _zero_div == "clamp":
163+
return _eps
164+
return d
165+
166+
def prod_ops(X):
167+
n = len(X)
168+
no = len(_ops_array)
169+
ops = np.ones(n)
170+
ops[:min(n, no)] = _ops_array[:min(n, no)]
171+
num = prod(X[i] for i in range(n) if ops[i] > 0)
172+
den = _safe_den(prod(X[i] for i in range(n) if ops[i] < 0))
173+
return num / den
174+
175+
def jac_ops(X):
176+
n = len(X)
177+
no = len(_ops_array)
178+
ops = np.ones(n)
179+
ops[:min(n, no)] = _ops_array[:min(n, no)]
180+
X = np.asarray(X, dtype=float)
181+
# Apply zero_div policy to all denominator inputs up front so
182+
# both the direct division and the rest-product stay consistent.
183+
X_safe = X.copy()
184+
for i in range(n):
185+
if ops[i] < 0:
186+
X_safe[i] = _safe_den(float(X[i]))
187+
row = []
188+
for k in range(n):
189+
rest = np.prod(
190+
np.power(np.delete(X_safe, k), np.delete(ops, k))
191+
)
192+
if ops[k] > 0: # multiply: dy/du_k = prod of rest
193+
row.append(rest)
194+
else: # divide: dy/du_k = -rest / u_k^2
195+
row.append(-rest / X_safe[k] ** 2)
196+
return np.array([row])
197+
198+
self.op_alg = Operator(func=prod_ops, jac=jac_ops)
199+
200+
201+
def __len__(self):
202+
"""Purely algebraic block."""
203+
return 1
204+
205+
206+
def update(self, t):
207+
"""Update system equation.
208+
209+
Parameters
210+
----------
211+
t : float
212+
Evaluation time.
213+
"""
214+
u = self.inputs.to_array()
215+
self.outputs.update_from_array(self.op_alg(u))

0 commit comments

Comments
 (0)