|
57 | 57 | ] |
58 | 58 |
|
59 | 59 |
|
60 | | -class EventScrubber: |
61 | | - def __init__(self, denylist=None): |
62 | | - # type: (Optional[List[str]]) -> None |
| 60 | +class EventScrubber(object): |
| 61 | + def __init__(self, denylist=None, recursive=False): |
| 62 | + # type: (Optional[List[str]], bool) -> None |
63 | 63 | self.denylist = DEFAULT_DENYLIST if denylist is None else denylist |
64 | 64 | self.denylist = [x.lower() for x in self.denylist] |
| 65 | + self.recursive = recursive |
| 66 | + |
| 67 | + def scrub_list(self, lst): |
| 68 | + # type: (List[Any]) -> None |
| 69 | + """ |
| 70 | + If a list is passed to this method, the method recursively searches the list and any |
| 71 | + nested lists for any dictionaries. The method calls scrub_dict on all dictionaries |
| 72 | + it finds. |
| 73 | + If the parameter passed to this method is not a list, the method does nothing. |
| 74 | + """ |
| 75 | + if not isinstance(lst, list): |
| 76 | + return |
| 77 | + |
| 78 | + for v in lst: |
| 79 | + if isinstance(v, dict): |
| 80 | + self.scrub_dict(v) |
| 81 | + elif isinstance(v, list): |
| 82 | + self.scrub_list(v) |
65 | 83 |
|
66 | 84 | def scrub_dict(self, d): |
67 | 85 | # type: (Dict[str, Any]) -> None |
68 | 86 | if not isinstance(d, dict): |
69 | 87 | return |
70 | 88 |
|
71 | | - for k in d.keys(): |
| 89 | + for k, v in d.items(): |
72 | 90 | if isinstance(k, str) and k.lower() in self.denylist: |
73 | 91 | d[k] = AnnotatedValue.substituted_because_contains_sensitive_data() |
| 92 | + elif self.recursive: |
| 93 | + if isinstance(v, dict): |
| 94 | + self.scrub_dict(v) |
| 95 | + elif isinstance(v, list): |
| 96 | + self.scrub_list(v) |
74 | 97 |
|
75 | 98 | def scrub_request(self, event): |
76 | 99 | # type: (Event) -> None |
|
0 commit comments