- Problem Overview
- Goals & Requirements
- High-Level Design
- Low-Level Design
- Data Structures
- Public API & Usage
- Extensibility & Runtime Updates
- Performance Considerations
- Edge Cases & Error Handling
- Testing & Validation
Refer to PROBLEM.md for a detailed problem statement, motivations, and challenges in streaming LLM output safely in real time.
- Prefix-safe multi-pattern detection over a character stream.
- Streaming sanitization with <5 µs overhead per token.
- Support PASS, DROP, REPLACE(text), HALT, CONTINUE_DROP/PASS callbacks.
- Sync and async generator integration via a decorator.
- Optional history tracking with minimal overhead.
- Dynamic rule registration/deregistration at runtime.
Client Token Generator (sync/async)
│
▼
@llm_stream_processor ← Public API decorator
│
▼
StreamProcessor Core ← Character-level engine
┌───────────────┐
│ Aho–Corasick │
│ Automaton │
└───────────────┘
│
▼
Lazy Buffer + Callbacks ← apply user decisions
│
▼
Re-packer (char/token/chunk)
│
▼
Consumer
- Decorator wraps the token generator, intercepting each yielded token.
- Token split into individual characters to maintain prefix safety.
- Each character fed to
StreamProcessor.process():- Advance the Aho–Corasick state, following failure links.
- On match, invoke callbacks in registration order.
- Apply decisions (PASS/DROP/REPLACE/HALT/CONTINUE_DROP/PASS) to the buffer.
- Lazy flush: once the buffer exceeds the longest keyword length, emit or drop the oldest character.
@llm_stream_processorrepacks flushed characters into the requested output mode.
- Trie Construction: Insert each keyword, storing callback lists at terminal nodes.
- Failure Links: BFS to point each node to the next longest suffix node.
- Output Merge: Each node accumulates outputs from its failure link for multi-pattern support.
- Buffer incoming chars in a deque up to
max_keyword_length. - Ensures that partial matches are not prematurely emitted.
On detecting the longest match at a node:
- Build
ActionContext(keyword, buffer snapshot, position, history). - Iterate callbacks:
- PASS: no change.
- DROP: pop the keyword length from the buffer.
- REPLACE(text): remove keyword and append replacement chars.
- HALT: remove keyword, mark halted, raise
StreamHalted. - CONTINUE_DROP / PASS: toggle drop mode, flush buffered content accordingly.
- Reset automaton state to root to detect overlapping patterns.
- _Node: children dict, failure link, output list of (keyword, callbacks).
- deque(buffer): O(1) append/popleft for lazy flushing.
- StreamHistory / NullHistory: track or stub input/output/action logs.
- ActionContext: immutable snapshot for callbacks.
- ActionDecision: encapsulates callback return instructions.
- KeywordRegistry: register/deregister keywords & callbacks, compile automaton.
- @llm_stream_processor: decorator for sync/async generators.
- Helper Actions:
drop(),replace(),halt(),continuous_drop(), etc.
from stream_processor import KeywordRegistry, llm_stream_processor, replace, halt
reg = KeywordRegistry()
reg.register('secret', replace('[REDACTED]'))
reg.register('stop', halt)
@llm_stream_processor(reg, yield_mode='token')
def chat():
yield 'The secret is out.'
yield 'Please stop here.'
yield 'No more.'
print(list(chat())) # ['The [REDACTED] is out.', 'Please '] - Dynamic
register()/deregister()calls onKeywordRegistryfollowed bycompile()rebuild. - Toggle
record_historyto optimize memory vs. introspection.
- Pre-compute
max_keyword_lengthto bound buffer size. - Minimize per-char allocations; use local variables.
- Defer
.join()calls to flush points, not in hot loops.
- Non-matching streams: flush buffer at end.
- Invalid
yield_modeor decorator misuse → clear exceptions. - Callback exceptions bubble up to abort processing.
- 31 unit tests cover registry, processor, decorator, actions, history.
- Sync & async flows, overlapping patterns, edge cases, HALT segments.
- Run with
python -m unittest discover tests.