forked from andialbrecht/sqlparse
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfilter.py
More file actions
98 lines (79 loc) · 2.67 KB
/
filter.py
File metadata and controls
98 lines (79 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# -*- coding: utf-8 -*-
from sqlparse import tokens as T
from sqlparse.engine.grouping import Statement, Token
class TokenFilter(object):
def __init__(self, **options):
self.options = options
def process(self, stack, stream):
"""Process token stream."""
raise NotImplementedError
class StatementFilter(TokenFilter):
def __init__(self):
self._in_declare = False
self._in_dbldollar = False
self._is_create = False
def _reset(self):
self._in_declare = False
self._in_dbldollar = False
self._is_create = False
def _change_splitlevel(self, ttype, value):
# PostgreSQL
if (ttype == T.Name.Builtin
and value.startswith('$') and value.endswith('$')):
if self._in_dbldollar:
self._in_dbldollar = False
return -1
else:
self._in_dbldollar = True
return 1
elif self._in_dbldollar:
return 0
# ANSI
if ttype is not T.Keyword:
return 0
unified = value.upper()
if unified == 'DECLARE':
self._in_declare = True
return 1
if unified == 'BEGIN':
if self._in_declare:
return 0
return 0
if unified == 'END':
# Should this respect a preceeding BEGIN?
# In CASE ... WHEN ... END this results in a split level -1.
return -1
if ttype is T.Keyword.DDL and unified.startswith('CREATE'):
self._is_create = True
if unified in ('IF', 'FOR') and self._is_create:
return 1
# Default
return 0
def process(self, stack, stream):
splitlevel = 0
stmt = None
consume_ws = False
stmt_tokens = []
for ttype, value in stream:
# Before appending the token
if (consume_ws and ttype is not T.Whitespace
and ttype is not T.Comment.Single):
consume_ws = False
stmt.tokens = stmt_tokens
yield stmt
self._reset()
stmt = None
splitlevel = 0
if stmt is None:
stmt = Statement()
stmt_tokens = []
splitlevel += self._change_splitlevel(ttype, value)
# Append the token
stmt_tokens.append(Token(ttype, value))
# After appending the token
if (splitlevel <= 0 and ttype is T.Punctuation
and value == ';'):
consume_ws = True
if stmt is not None:
stmt.tokens = stmt_tokens
yield stmt