|
| 1 | +"""A minimal implementation of the Common Lisp conditions system for Python. |
| 2 | +
|
| 3 | +To keep this simple, no debugger support. And no implicit "no such function, |
| 4 | +what would you like to do?" hook on function calls. To use conditions, you have |
| 5 | +to explicitly ask for them. |
| 6 | +
|
| 7 | +This module exports four forms: `signal`, `invoke_restart`, `with restarts`, |
| 8 | +and `with handlers`, which interlock in a very particular way. Usage:: |
| 9 | +
|
| 10 | + def lowlevel(): |
| 11 | + # define here what actions are available when stuff goes wrong |
| 12 | + # in this low-level code |
| 13 | + with restarts(use_value=..., |
| 14 | + do_something_else=...): |
| 15 | + ... |
| 16 | + # When stuff goes wrong, ask the caller what we should do |
| 17 | + x = ... if our_usual_case else signal("help_me") |
| 18 | + ... |
| 19 | +
|
| 20 | + # high-level code - choose here which action to take for each named signal |
| 21 | + with handlers(help_me=(lambda: invoke_restart("use_value"))): |
| 22 | + lowlevel() |
| 23 | +
|
| 24 | +This arrangement improves modularity. The high-level code may reside in a |
| 25 | +completely different part of the application, and/or be written only much |
| 26 | +later. The implementer of low-level code can provide a set of canned |
| 27 | +error-recovery strategies *appropriate for that low-level code, for any future |
| 28 | +user*, but leave to each call site the ultimate decision of which strategy to |
| 29 | +pick in any particular use case. |
| 30 | +
|
| 31 | +
|
| 32 | +**Acknowledgements**: |
| 33 | +
|
| 34 | +Big thanks to Alexander Artemenko (@svetlyak40wt) for the original library this |
| 35 | +module is based on: |
| 36 | +
|
| 37 | + https://github.com/svetlyak40wt/python-cl-conditions/ |
| 38 | +
|
| 39 | +To understand conditions, see *Chapter 19: Beyond Exception Handling: |
| 40 | +Conditions and Restarts* in *Practical Common Lisp* by Peter Seibel (2005): |
| 41 | +
|
| 42 | + http://www.gigamonkeys.com/book/beyond-exception-handling-conditions-and-restarts.html |
| 43 | +""" |
| 44 | + |
| 45 | +__all__ = ["signal", "invoke_restart", "restarts", "handlers"] |
| 46 | + |
| 47 | +import threading |
| 48 | +from collections import deque |
| 49 | + |
| 50 | +_stacks = threading.local() |
| 51 | +def _ensure_stacks(): # per-thread init |
| 52 | + for x in ("restarts", "handlers"): |
| 53 | + if not hasattr(_stacks, x): |
| 54 | + setattr(_stacks, x, deque()) |
| 55 | + |
| 56 | +class ConditionError(RuntimeError): |
| 57 | + """Represents a runtime error detected by the conditions system.""" |
| 58 | + |
| 59 | +def signal(condition_name, *args, **kwargs): |
| 60 | + """Signal a condition. |
| 61 | +
|
| 62 | + Any args and kwargs are passed through to the condition handler. |
| 63 | +
|
| 64 | + The return value of `signal()` is the return value of the restart that was |
| 65 | + invoked by the handler that chose to handle the condition. |
| 66 | +
|
| 67 | + Call `signal` in your low-level logic to indicate that something exceptional |
| 68 | + just occurred, and a *condition handler* defined in higher-level code now |
| 69 | + needs to choose which recovery strategy (out of those defined by the |
| 70 | + low-level code) should be taken. |
| 71 | +
|
| 72 | + To handle the condition, a handler must call `invoke_restart()` for one of |
| 73 | + the restarts currently in scope. This immediately terminates the handler, |
| 74 | + transferring control to the restart. |
| 75 | +
|
| 76 | + To instead cancel, and delegate to the next (outer) handler for the same |
| 77 | + condition type, a handler may return normally without calling |
| 78 | + `invoke_restart()`. The return value of the handler is ignored. |
| 79 | + """ |
| 80 | + try: |
| 81 | + for handler in _find_handlers(condition_name): |
| 82 | + # Since the handler is called normally, it does not unwind the call stack. |
| 83 | + # We remain inside the `signal()` call in the low-level code. |
| 84 | + handler(*args, **kwargs) |
| 85 | + except _DelayedCall as invoke: |
| 86 | + return invoke() |
| 87 | + else: |
| 88 | + raise ConditionError("Unhandled condition '{}'".format(condition_name)) |
| 89 | + |
| 90 | +def invoke_restart(restart_name, *args, **kwargs): |
| 91 | + """Invoke a restart currently in scope. |
| 92 | +
|
| 93 | + restart_name is used to look up the most recently bound restart matching |
| 94 | + the name. |
| 95 | +
|
| 96 | + Any args and kwargs are passed through to the restart. |
| 97 | +
|
| 98 | + To handle the condition, call `invoke_restart` from inside your condition |
| 99 | + handler in your high-level logic. The call immediately terminates the |
| 100 | + handler, transferring control to the restart. |
| 101 | +
|
| 102 | + To instead cancel, and delegate to the next (outer) handler for the same |
| 103 | + condition type, a handler may return normally without calling |
| 104 | + `invoke_restart()`. |
| 105 | + """ |
| 106 | + f = _find_restart(restart_name) |
| 107 | + raise _DelayedCall(f, *args, **kwargs) |
| 108 | + |
| 109 | +class _Stacked: # boilerplate |
| 110 | + def __init__(self, **bindings): |
| 111 | + _ensure_stacks() |
| 112 | + self.e = bindings |
| 113 | + def __enter__(self): |
| 114 | + self.dq.appendleft(self.e) |
| 115 | + return self |
| 116 | + def __exit__(self, exctype, excvalue, traceback): |
| 117 | + self.dq.popleft() |
| 118 | + |
| 119 | +class restarts(_Stacked): |
| 120 | + """Context manager: a dynamic let for restarts.""" |
| 121 | + def __init__(self, **bindings): |
| 122 | + """binding: name (str) -> callable""" |
| 123 | + super().__init__(**bindings) |
| 124 | + self.dq = _stacks.restarts |
| 125 | + |
| 126 | +class handlers(_Stacked): |
| 127 | + """Context manager: a dynamic let for condition handlers.""" |
| 128 | + def __init__(self, **bindings): |
| 129 | + """binding: name (str) -> callable""" |
| 130 | + super().__init__(**bindings) |
| 131 | + self.dq = _stacks.handlers |
| 132 | + |
| 133 | +class _DelayedCall(Exception): |
| 134 | + def __init__(self, f, *args, **kwargs): |
| 135 | + self.f, self.args, self.kwargs = f, args, kwargs |
| 136 | + def __call__(self): |
| 137 | + return self.f(*self.args, **self.kwargs) |
| 138 | + |
| 139 | +def _find_handlers(name): # 0..n (though 0 is an error, handled at the calling end) |
| 140 | + for e in _stacks.handlers: |
| 141 | + if name in e: |
| 142 | + yield e[name] |
| 143 | + |
| 144 | +def _find_restart(name): # exactly 1 (most recently bound wins) |
| 145 | + for e in _stacks.restarts: |
| 146 | + if name in e: |
| 147 | + return e[name] |
| 148 | + raise ConditionError("Restart '{}' not found".format(name)) |
0 commit comments