18
18
class TestConversion (base .PyMySQLTestCase ):
19
19
def test_datatypes (self ):
20
20
""" test every data type """
21
- conn = self .connections [ 0 ]
21
+ conn = self .connect ()
22
22
c = conn .cursor ()
23
23
c .execute ("create table test_datatypes (b bit, i int, l bigint, f real, s varchar(32), u varchar(32), bb blob, d date, dt datetime, ts timestamp, td time, t time, st datetime)" )
24
24
try :
@@ -57,7 +57,7 @@ def test_datatypes(self):
57
57
58
58
def test_dict (self ):
59
59
""" test dict escaping """
60
- conn = self .connections [ 0 ]
60
+ conn = self .connect ()
61
61
c = conn .cursor ()
62
62
c .execute ("create table test_dict (a integer, b integer, c integer)" )
63
63
try :
@@ -68,7 +68,7 @@ def test_dict(self):
68
68
c .execute ("drop table test_dict" )
69
69
70
70
def test_string (self ):
71
- conn = self .connections [ 0 ]
71
+ conn = self .connect ()
72
72
c = conn .cursor ()
73
73
c .execute ("create table test_dict (a text)" )
74
74
test_value = "I am a test string"
@@ -80,7 +80,7 @@ def test_string(self):
80
80
c .execute ("drop table test_dict" )
81
81
82
82
def test_integer (self ):
83
- conn = self .connections [ 0 ]
83
+ conn = self .connect ()
84
84
c = conn .cursor ()
85
85
c .execute ("create table test_dict (a integer)" )
86
86
test_value = 12345
@@ -94,7 +94,7 @@ def test_integer(self):
94
94
def test_binary (self ):
95
95
"""test binary data"""
96
96
data = bytes (bytearray (range (255 )))
97
- conn = self .connections [ 0 ]
97
+ conn = self .connect ()
98
98
self .safe_create_table (
99
99
conn , "test_binary" , "create table test_binary (b binary(255))" )
100
100
@@ -106,7 +106,7 @@ def test_binary(self):
106
106
def test_blob (self ):
107
107
"""test blob data"""
108
108
data = bytes (bytearray (range (256 )) * 4 )
109
- conn = self .connections [ 0 ]
109
+ conn = self .connect ()
110
110
self .safe_create_table (
111
111
conn , "test_blob" , "create table test_blob (b blob)" )
112
112
@@ -117,7 +117,7 @@ def test_blob(self):
117
117
118
118
def test_untyped (self ):
119
119
""" test conversion of null, empty string """
120
- conn = self .connections [ 0 ]
120
+ conn = self .connect ()
121
121
c = conn .cursor ()
122
122
c .execute ("select null,''" )
123
123
self .assertEqual ((None ,u'' ), c .fetchone ())
@@ -126,7 +126,7 @@ def test_untyped(self):
126
126
127
127
def test_timedelta (self ):
128
128
""" test timedelta conversion """
129
- conn = self .connections [ 0 ]
129
+ conn = self .connect ()
130
130
c = conn .cursor ()
131
131
c .execute ("select time('12:30'), time('23:12:59'), time('23:12:59.05100'), time('-12:30'), time('-23:12:59'), time('-23:12:59.05100'), time('-00:30')" )
132
132
self .assertEqual ((datetime .timedelta (0 , 45000 ),
@@ -141,7 +141,7 @@ def test_timedelta(self):
141
141
def test_datetime_microseconds (self ):
142
142
""" test datetime conversion w microseconds"""
143
143
144
- conn = self .connections [ 0 ]
144
+ conn = self .connect ()
145
145
if not self .mysql_server_is (conn , (5 , 6 , 4 )):
146
146
raise SkipTest ("target backend does not support microseconds" )
147
147
c = conn .cursor ()
@@ -206,15 +206,15 @@ class TestCursor(base.PyMySQLTestCase):
206
206
# ('max_updates', 3, 1, 11, 11, 0, 0),
207
207
# ('max_connections', 3, 1, 11, 11, 0, 0),
208
208
# ('max_user_connections', 3, 1, 11, 11, 0, 0))
209
- # conn = self.connections[0]
209
+ # conn = self.connect()
210
210
# c = conn.cursor()
211
211
# c.execute("select * from mysql.user")
212
212
#
213
213
# self.assertEqual(r, c.description)
214
214
215
215
def test_fetch_no_result (self ):
216
216
""" test a fetchone() with no rows """
217
- conn = self .connections [ 0 ]
217
+ conn = self .connect ()
218
218
c = conn .cursor ()
219
219
c .execute ("create table test_nr (b varchar(32))" )
220
220
try :
@@ -226,7 +226,7 @@ def test_fetch_no_result(self):
226
226
227
227
def test_aggregates (self ):
228
228
""" test aggregate functions """
229
- conn = self .connections [ 0 ]
229
+ conn = self .connect ()
230
230
c = conn .cursor ()
231
231
try :
232
232
c .execute ('create table test_aggregates (i integer)' )
@@ -240,7 +240,7 @@ def test_aggregates(self):
240
240
241
241
def test_single_tuple (self ):
242
242
""" test a single tuple """
243
- conn = self .connections [ 0 ]
243
+ conn = self .connect ()
244
244
c = conn .cursor ()
245
245
self .safe_create_table (
246
246
conn , 'mystuff' ,
@@ -283,7 +283,7 @@ class TestBulkInserts(base.PyMySQLTestCase):
283
283
284
284
def setUp (self ):
285
285
super (TestBulkInserts , self ).setUp ()
286
- self .conn = conn = self .connections [ 0 ]
286
+ self .conn = conn = self .connect ()
287
287
c = conn .cursor (self .cursor_type )
288
288
289
289
# create a table ane some data to query
@@ -299,14 +299,14 @@ def setUp(self):
299
299
""" )
300
300
301
301
def _verify_records (self , data ):
302
- conn = self .connections [ 0 ]
302
+ conn = self .connect ()
303
303
cursor = conn .cursor ()
304
304
cursor .execute ("SELECT id, name, age, height from bulkinsert" )
305
305
result = cursor .fetchall ()
306
306
self .assertEqual (sorted (data ), sorted (result ))
307
307
308
308
def test_bulk_insert (self ):
309
- conn = self .connections [ 0 ]
309
+ conn = self .connect ()
310
310
cursor = conn .cursor ()
311
311
312
312
data = [(0 , "bob" , 21 , 123 ), (1 , "jim" , 56 , 45 ), (2 , "fred" , 100 , 180 )]
@@ -320,7 +320,7 @@ def test_bulk_insert(self):
320
320
self ._verify_records (data )
321
321
322
322
def test_bulk_insert_multiline_statement (self ):
323
- conn = self .connections [ 0 ]
323
+ conn = self .connect ()
324
324
cursor = conn .cursor ()
325
325
data = [(0 , "bob" , 21 , 123 ), (1 , "jim" , 56 , 45 ), (2 , "fred" , 100 , 180 )]
326
326
cursor .executemany ("""insert
@@ -344,7 +344,7 @@ def test_bulk_insert_multiline_statement(self):
344
344
self ._verify_records (data )
345
345
346
346
def test_bulk_insert_single_record (self ):
347
- conn = self .connections [ 0 ]
347
+ conn = self .connect ()
348
348
cursor = conn .cursor ()
349
349
data = [(0 , "bob" , 21 , 123 )]
350
350
cursor .executemany ("insert into bulkinsert (id, name, age, height) "
@@ -354,7 +354,7 @@ def test_bulk_insert_single_record(self):
354
354
355
355
def test_issue_288 (self ):
356
356
"""executemany should work with "insert ... on update" """
357
- conn = self .connections [ 0 ]
357
+ conn = self .connect ()
358
358
cursor = conn .cursor ()
359
359
data = [(0 , "bob" , 21 , 123 ), (1 , "jim" , 56 , 45 ), (2 , "fred" , 100 , 180 )]
360
360
cursor .executemany ("""insert
@@ -380,7 +380,7 @@ def test_issue_288(self):
380
380
self ._verify_records (data )
381
381
382
382
def test_warnings (self ):
383
- con = self .connections [ 0 ]
383
+ con = self .connect ()
384
384
cur = con .cursor ()
385
385
with warnings .catch_warnings (record = True ) as ws :
386
386
warnings .simplefilter ("always" )
0 commit comments