|
| 1 | +''' |
| 2 | +Created on 13/02/2012 |
| 3 | +
|
| 4 | +@author: piranna |
| 5 | +''' |
| 6 | +from unittest import main, TestCase |
| 7 | + |
| 8 | +from sqlparse.filters import Tokens2Unicode |
| 9 | +from sqlparse.lexer import tokenize |
| 10 | + |
| 11 | +import sys |
| 12 | +sys.path.insert(0, '..') |
| 13 | + |
| 14 | +from sqlparse.filters import Compact |
| 15 | +from sqlparse.functions import getcolumns, getlimit, IsType |
| 16 | + |
| 17 | + |
| 18 | +class Test_SQL(TestCase): |
| 19 | + sql = """-- type: script |
| 20 | + -- return: integer |
| 21 | +
|
| 22 | + INCLUDE "_Make_DirEntry.sql"; |
| 23 | +
|
| 24 | + INSERT INTO directories(inode) |
| 25 | + VALUES(:inode) |
| 26 | + LIMIT 1""" |
| 27 | + |
| 28 | + sql2 = """SELECT child_entry,asdf AS inode, creation |
| 29 | + FROM links |
| 30 | + WHERE parent_dir == :parent_dir AND name == :name |
| 31 | + LIMIT 1""" |
| 32 | + |
| 33 | + sql3 = """SELECT |
| 34 | + 0 AS st_dev, |
| 35 | + 0 AS st_uid, |
| 36 | + 0 AS st_gid, |
| 37 | +
|
| 38 | + dir_entries.type AS st_mode, |
| 39 | + dir_entries.inode AS st_ino, |
| 40 | + COUNT(links.child_entry) AS st_nlink, |
| 41 | +
|
| 42 | + :creation AS st_ctime, |
| 43 | + dir_entries.access AS st_atime, |
| 44 | + dir_entries.modification AS st_mtime, |
| 45 | +-- :creation AS st_ctime, |
| 46 | +-- CAST(STRFTIME('%s',dir_entries.access) AS INTEGER) AS st_atime, |
| 47 | +-- CAST(STRFTIME('%s',dir_entries.modification) AS INTEGER) AS st_mtime, |
| 48 | +
|
| 49 | + COALESCE(files.size,0) AS st_size, -- Python-FUSE |
| 50 | + COALESCE(files.size,0) AS size -- PyFilesystem |
| 51 | +
|
| 52 | +FROM dir_entries |
| 53 | + LEFT JOIN files |
| 54 | + ON dir_entries.inode == files.inode |
| 55 | + LEFT JOIN links |
| 56 | + ON dir_entries.inode == links.child_entry |
| 57 | +
|
| 58 | +WHERE dir_entries.inode == :inode |
| 59 | +
|
| 60 | +GROUP BY dir_entries.inode |
| 61 | +LIMIT 1""" |
| 62 | + |
| 63 | + |
| 64 | +class Test_Compact(Test_SQL): |
| 65 | + def test_compact1(self): |
| 66 | + self.assertEqual(Tokens2Unicode(Compact(self.sql, 'tests/files')), |
| 67 | + 'INSERT INTO dir_entries(type)VALUES(:type);INSERT INTO ' |
| 68 | + 'directories(inode)VALUES(:inode)LIMIT 1') |
| 69 | + |
| 70 | + def test_compact2(self): |
| 71 | + self.assertEqual(Tokens2Unicode(Compact(self.sql2)), |
| 72 | + 'SELECT child_entry,asdf AS inode,creation FROM links WHERE ' |
| 73 | + 'parent_dir==:parent_dir AND name==:name LIMIT 1') |
| 74 | + |
| 75 | + def test_compact3(self): |
| 76 | + self.assertEqual(Tokens2Unicode(Compact(self.sql3)), |
| 77 | + 'SELECT 0 AS st_dev,0 AS st_uid,0 AS st_gid,dir_entries.type AS ' |
| 78 | + 'st_mode,dir_entries.inode AS st_ino,COUNT(links.child_entry)AS ' |
| 79 | + 'st_nlink,:creation AS st_ctime,dir_entries.access AS st_atime,' |
| 80 | + 'dir_entries.modification AS st_mtime,COALESCE(files.size,0)AS ' |
| 81 | + 'st_size,COALESCE(files.size,0)AS size FROM dir_entries LEFT JOIN' |
| 82 | + ' files ON dir_entries.inode==files.inode LEFT JOIN links ON ' |
| 83 | + 'dir_entries.inode==links.child_entry WHERE dir_entries.inode==' |
| 84 | + ':inode GROUP BY dir_entries.inode LIMIT 1') |
| 85 | + |
| 86 | + |
| 87 | +class Test_GetColumns(Test_SQL): |
| 88 | + def test_getcolumns1(self): |
| 89 | + columns = getcolumns(tokenize(self.sql)) |
| 90 | + self.assertEqual(columns, []) |
| 91 | + |
| 92 | + def test_getcolumns2(self): |
| 93 | + columns = getcolumns(tokenize(self.sql2)) |
| 94 | + self.assertEqual(columns, ['child_entry', 'inode', 'creation']) |
| 95 | + |
| 96 | + def test_getcolumns3(self): |
| 97 | + columns = getcolumns(tokenize(self.sql3)) |
| 98 | + self.assertEqual(columns, ['st_dev', 'st_uid', 'st_gid', 'st_mode', |
| 99 | + 'st_ino', 'st_nlink', 'st_ctime', |
| 100 | + 'st_atime', 'st_mtime', 'st_size', 'size']) |
| 101 | + |
| 102 | + |
| 103 | +class Test_GetLimit(Test_SQL): |
| 104 | + def test_getlimit1(self): |
| 105 | + limit = getlimit(tokenize(self.sql)) |
| 106 | + self.assertEqual(limit, 1) |
| 107 | + |
| 108 | + def test_getlimit2(self): |
| 109 | + limit = getlimit(tokenize(self.sql2)) |
| 110 | + self.assertEqual(limit, 1) |
| 111 | + |
| 112 | + def test_getlimit3(self): |
| 113 | + limit = getlimit(tokenize(self.sql3)) |
| 114 | + self.assertEqual(limit, 1) |
| 115 | + |
| 116 | + |
| 117 | +class Test_IsType(Test_SQL): |
| 118 | + def test_istype2(self): |
| 119 | + stream = tokenize(self.sql2) |
| 120 | + self.assertTrue(IsType('SELECT')(stream)) |
| 121 | + |
| 122 | + stream = tokenize(self.sql2) |
| 123 | + self.assertFalse(IsType('INSERT')(stream)) |
| 124 | + |
| 125 | + |
| 126 | +if __name__ == "__main__": |
| 127 | + #import sys;sys.argv = ['', 'Test.testName'] |
| 128 | + main() |
0 commit comments