|
@@ -8,8 +8,8 @@ __author__ = '[email protected] (Herman Sheremetyev)'
|
|
|
|
|
|
import time
|
|
import time
|
|
import unittest
|
|
import unittest
|
|
-from dbtextdb import *
|
|
|
|
-
|
|
|
|
|
|
+from dbtextdb import DBText
|
|
|
|
+from dbtextdb import ParseError, ExecuteError
|
|
|
|
|
|
class DBTextTest(unittest.TestCase):
|
|
class DBTextTest(unittest.TestCase):
|
|
|
|
|
|
@@ -25,28 +25,28 @@ class DBTextTest(unittest.TestCase):
|
|
# normal query
|
|
# normal query
|
|
query_normal = 'select * from subscriber;'
|
|
query_normal = 'select * from subscriber;'
|
|
db_conn.ParseQuery(query_normal)
|
|
db_conn.ParseQuery(query_normal)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
- self.assert_(db_conn.columns == ['*'])
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
|
|
+ self.assertEqual(db_conn.columns, ['*'])
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal query with condition
|
|
# normal query with condition
|
|
query_normal_cond = 'select * from subscriber where column="value";'
|
|
query_normal_cond = 'select * from subscriber where column="value";'
|
|
db_conn.ParseQuery(query_normal_cond)
|
|
db_conn.ParseQuery(query_normal_cond)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
- self.assert_(db_conn.columns == ['*'])
|
|
|
|
- self.assert_(db_conn.strings == ['value'])
|
|
|
|
- self.assert_(not db_conn.count)
|
|
|
|
- self.assert_(db_conn.conditions == {'column': 'value'})
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
|
|
+ self.assertEqual(db_conn.columns, ['*'])
|
|
|
|
+ self.assertEqual(db_conn.strings, ['value'])
|
|
|
|
+ self.assertTrue(not db_conn.count)
|
|
|
|
+ self.assertEqual(db_conn.conditions, {'column': 'value'})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal query with multiple conditions
|
|
# normal query with multiple conditions
|
|
query_normal_cond = ('select * from subscriber where column="value1" and '
|
|
query_normal_cond = ('select * from subscriber where column="value1" and '
|
|
'col2=" another value " and col3= foo and a="";')
|
|
'col2=" another value " and col3= foo and a="";')
|
|
db_conn.ParseQuery(query_normal_cond)
|
|
db_conn.ParseQuery(query_normal_cond)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
- self.assert_(db_conn.columns == ['*'])
|
|
|
|
- self.assert_(db_conn.strings == ['value1', ' another value ', ''])
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
|
|
+ self.assertEqual(db_conn.columns, ['*'])
|
|
|
|
+ self.assertEqual(db_conn.strings, ['value1', ' another value ', ''])
|
|
self.assertEqual(db_conn.conditions, {'column': 'value1',
|
|
self.assertEqual(db_conn.conditions, {'column': 'value1',
|
|
'col2': ' another value ',
|
|
'col2': ' another value ',
|
|
'col3': 'foo', 'a': ''})
|
|
'col3': 'foo', 'a': ''})
|
|
@@ -54,49 +54,49 @@ class DBTextTest(unittest.TestCase):
|
|
# normal query with count
|
|
# normal query with count
|
|
query_normal_count = 'select count(*) from subscriber;'
|
|
query_normal_count = 'select count(*) from subscriber;'
|
|
db_conn.ParseQuery(query_normal_count)
|
|
db_conn.ParseQuery(query_normal_count)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
- self.assert_(db_conn.columns == ['*'])
|
|
|
|
- self.assert_(db_conn.count == True)
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
|
|
+ self.assertEqual(db_conn.columns, ['*'])
|
|
|
|
+ self.assertEqual(db_conn.count, True)
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal query with now()
|
|
# normal query with now()
|
|
query_normal_count = 'select count(*) from subscriber where time=now();'
|
|
query_normal_count = 'select count(*) from subscriber where time=now();'
|
|
db_conn.ParseQuery(query_normal_count)
|
|
db_conn.ParseQuery(query_normal_count)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
- self.assert_(db_conn.columns == ['*'])
|
|
|
|
- self.assert_(db_conn.count == True)
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
|
|
+ self.assertEqual(db_conn.columns, ['*'])
|
|
|
|
+ self.assertEqual(db_conn.count, True)
|
|
self.assertEqual(db_conn.conditions, {'time': self.time_now})
|
|
self.assertEqual(db_conn.conditions, {'time': self.time_now})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal delete query
|
|
# normal delete query
|
|
query_normal_delete = 'delete from subscriber where foo = 2;'
|
|
query_normal_delete = 'delete from subscriber where foo = 2;'
|
|
db_conn.ParseQuery(query_normal_delete)
|
|
db_conn.ParseQuery(query_normal_delete)
|
|
- self.assert_(db_conn.command == 'DELETE')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'DELETE')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
self.assertEqual(db_conn.conditions, {'foo': '2'})
|
|
self.assertEqual(db_conn.conditions, {'foo': '2'})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal insert values query with no into
|
|
# normal insert values query with no into
|
|
query_normal_insert_values = ('insert subscriber (col1, col2, col3) '
|
|
query_normal_insert_values = ('insert subscriber (col1, col2, col3) '
|
|
'values (1, "foo", "");')
|
|
'values (1, "foo", "");')
|
|
db_conn.ParseQuery(query_normal_insert_values)
|
|
db_conn.ParseQuery(query_normal_insert_values)
|
|
- self.assert_(db_conn.command == 'INSERT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'INSERT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
self.assertEqual(db_conn.targets, {'col1': '1', 'col2': 'foo', 'col3': ''})
|
|
self.assertEqual(db_conn.targets, {'col1': '1', 'col2': 'foo', 'col3': ''})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal insert values query with into
|
|
# normal insert values query with into
|
|
query_normal_insert_into_values = ('insert into subscriber (col1, col2) '
|
|
query_normal_insert_into_values = ('insert into subscriber (col1, col2) '
|
|
'values (1, "foo");')
|
|
'values (1, "foo");')
|
|
db_conn.ParseQuery(query_normal_insert_into_values)
|
|
db_conn.ParseQuery(query_normal_insert_into_values)
|
|
- self.assert_(db_conn.command == 'INSERT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'INSERT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
self.assertEqual(db_conn.targets, {'col1': '1', 'col2': 'foo'})
|
|
self.assertEqual(db_conn.targets, {'col1': '1', 'col2': 'foo'})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal insert values query with now()
|
|
# normal insert values query with now()
|
|
query_normal_insert_into_values = ('insert into subscriber (a, b, c) '
|
|
query_normal_insert_into_values = ('insert into subscriber (a, b, c) '
|
|
'values (NOW(), "foo", now());')
|
|
'values (NOW(), "foo", now());')
|
|
db_conn.ParseQuery(query_normal_insert_into_values)
|
|
db_conn.ParseQuery(query_normal_insert_into_values)
|
|
- self.assert_(db_conn.command == 'INSERT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'INSERT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
self.assertEqual(db_conn.targets, {'a': self.time_now, 'b': 'foo',
|
|
self.assertEqual(db_conn.targets, {'a': self.time_now, 'b': 'foo',
|
|
'c': self.time_now})
|
|
'c': self.time_now})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
@@ -157,23 +157,23 @@ class DBTextTest(unittest.TestCase):
|
|
# normal insert set query with no into
|
|
# normal insert set query with no into
|
|
query_normal_insert_set = ('insert subscriber set col= 1, col2 ="\'f\'b";')
|
|
query_normal_insert_set = ('insert subscriber set col= 1, col2 ="\'f\'b";')
|
|
db_conn.ParseQuery(query_normal_insert_set)
|
|
db_conn.ParseQuery(query_normal_insert_set)
|
|
- self.assert_(db_conn.command == 'INSERT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'INSERT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
self.assertEqual(db_conn.targets, {'col': '1', 'col2': '\'f\'b'})
|
|
self.assertEqual(db_conn.targets, {'col': '1', 'col2': '\'f\'b'})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal update
|
|
# normal update
|
|
query_normal_update = ('update subscriber set col1= 1, col2 ="foo";')
|
|
query_normal_update = ('update subscriber set col1= 1, col2 ="foo";')
|
|
db_conn.ParseQuery(query_normal_update)
|
|
db_conn.ParseQuery(query_normal_update)
|
|
- self.assert_(db_conn.command == 'UPDATE')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'UPDATE')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
self.assertEqual(db_conn.targets, {'col1': '1', 'col2': 'foo'})
|
|
self.assertEqual(db_conn.targets, {'col1': '1', 'col2': 'foo'})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal update with condition
|
|
# normal update with condition
|
|
query_normal_update_cond = ('update subscriber set col1= 1, col2 ="foo" '
|
|
query_normal_update_cond = ('update subscriber set col1= 1, col2 ="foo" '
|
|
'where foo = "bar" and id=1 and a="";')
|
|
'where foo = "bar" and id=1 and a="";')
|
|
db_conn.ParseQuery(query_normal_update_cond)
|
|
db_conn.ParseQuery(query_normal_update_cond)
|
|
- self.assert_(db_conn.command == 'UPDATE')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'UPDATE')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
self.assertEqual(db_conn.targets, {'col1': '1', 'col2': 'foo'})
|
|
self.assertEqual(db_conn.targets, {'col1': '1', 'col2': 'foo'})
|
|
self.assertEqual(db_conn.conditions, {'foo': 'bar', 'id': '1', 'a': ''})
|
|
self.assertEqual(db_conn.conditions, {'foo': 'bar', 'id': '1', 'a': ''})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
@@ -220,49 +220,49 @@ class DBTextTest(unittest.TestCase):
|
|
# normal query with multiple columns
|
|
# normal query with multiple columns
|
|
query_normal_count = 'select col1, "col 2",col3 , "col4" from subscriber;'
|
|
query_normal_count = 'select col1, "col 2",col3 , "col4" from subscriber;'
|
|
db_conn.ParseQuery(query_normal_count)
|
|
db_conn.ParseQuery(query_normal_count)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
- self.assert_(db_conn.strings == ['col 2', 'col4'])
|
|
|
|
- self.assert_(db_conn.columns == ['col1', "'col 2'", 'col3', "'col4'"])
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
|
|
+ self.assertEqual(db_conn.strings, ['col 2', 'col4'])
|
|
|
|
+ self.assertEqual(db_conn.columns, ['col1', "'col 2'", 'col3', "'col4'"])
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal query with ORDER BY
|
|
# normal query with ORDER BY
|
|
query_normal_order_by = ('select col1, col2 from test'
|
|
query_normal_order_by = ('select col1, col2 from test'
|
|
' ORDER by col1;')
|
|
' ORDER by col1;')
|
|
db_conn.ParseQuery(query_normal_order_by)
|
|
db_conn.ParseQuery(query_normal_order_by)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'test')
|
|
|
|
- self.assert_(db_conn.columns == ['col1', 'col2'])
|
|
|
|
- self.assert_(db_conn.order_by == 'col1')
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'test')
|
|
|
|
+ self.assertEqual(db_conn.columns, ['col1', 'col2'])
|
|
|
|
+ self.assertEqual(db_conn.order_by, 'col1')
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal query with ORDER BY with conditions
|
|
# normal query with ORDER BY with conditions
|
|
query_normal_order_by_cond = ('select col1, col2 from test where col="asdf"'
|
|
query_normal_order_by_cond = ('select col1, col2 from test where col="asdf"'
|
|
' and col2 = "foo" ORDER by col;')
|
|
' and col2 = "foo" ORDER by col;')
|
|
db_conn.ParseQuery(query_normal_order_by_cond)
|
|
db_conn.ParseQuery(query_normal_order_by_cond)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'test')
|
|
|
|
- self.assert_(db_conn.columns == ['col1', 'col2'])
|
|
|
|
- self.assert_(db_conn.conditions == {'col': 'asdf', 'col2': 'foo'})
|
|
|
|
- self.assert_(db_conn.order_by == 'col')
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'test')
|
|
|
|
+ self.assertEqual(db_conn.columns, ['col1', 'col2'])
|
|
|
|
+ self.assertEqual(db_conn.conditions, {'col': 'asdf', 'col2': 'foo'})
|
|
|
|
+ self.assertEqual(db_conn.order_by, 'col')
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal query with CONCAT
|
|
# normal query with CONCAT
|
|
query_normal_concat = ('select concat(uname,"@", domain) as email_addr '
|
|
query_normal_concat = ('select concat(uname,"@", domain) as email_addr '
|
|
'from subscriber where id=3;')
|
|
'from subscriber where id=3;')
|
|
db_conn.ParseQuery(query_normal_concat)
|
|
db_conn.ParseQuery(query_normal_concat)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'subscriber')
|
|
|
|
- self.assert_(db_conn.columns == ['email_addr'])
|
|
|
|
- self.assert_(db_conn.conditions == {'id': '3'})
|
|
|
|
- self.assert_(db_conn.aliases == {'email_addr': ['uname', "'@'", 'domain']})
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'subscriber')
|
|
|
|
+ self.assertEqual(db_conn.columns, ['email_addr'])
|
|
|
|
+ self.assertEqual(db_conn.conditions, {'id': '3'})
|
|
|
|
+ self.assertEqual(db_conn.aliases, {'email_addr': ['uname', "'@'", 'domain']})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# normal query with multiple CONCAT
|
|
# normal query with multiple CONCAT
|
|
query_normal_mult_concat = ('select concat(uname,"@", domain) as email,'
|
|
query_normal_mult_concat = ('select concat(uname,"@", domain) as email,'
|
|
' foo as "bar" from table where id=3;')
|
|
' foo as "bar" from table where id=3;')
|
|
db_conn.ParseQuery(query_normal_mult_concat)
|
|
db_conn.ParseQuery(query_normal_mult_concat)
|
|
- self.assert_(db_conn.command == 'SELECT')
|
|
|
|
- self.assert_(db_conn.table == 'table')
|
|
|
|
- self.assert_(db_conn.columns == ['email', "'bar'"])
|
|
|
|
- self.assert_(db_conn.conditions == {'id': '3'})
|
|
|
|
- self.assert_(db_conn.aliases == {"'bar'": ['foo'],
|
|
|
|
|
|
+ self.assertEqual(db_conn.command, 'SELECT')
|
|
|
|
+ self.assertEqual(db_conn.table, 'table')
|
|
|
|
+ self.assertEqual(db_conn.columns, ['email', "'bar'"])
|
|
|
|
+ self.assertEqual(db_conn.conditions, {'id': '3'})
|
|
|
|
+ self.assertTrue(db_conn.aliases == {"'bar'": ['foo'],
|
|
'email': ['uname', "'@'", 'domain']})
|
|
'email': ['uname', "'@'", 'domain']})
|
|
db_conn.CleanUp()
|
|
db_conn.CleanUp()
|
|
# bad query with CONCAT missing AS
|
|
# bad query with CONCAT missing AS
|