diff --git a/.travis.databases.json b/.travis.databases.json index f27349d..852209e 100644 --- a/.travis.databases.json +++ b/.travis.databases.json @@ -1,4 +1,4 @@ [ - {"host": "localhost", "user": "root", "passwd": "", "db": "test_pymysql", "use_unicode": true, "local_infile": true}, + {"host": "localhost", "user": "root", "passwd": "", "db": "test_pymysql", "use_unicode": true}, {"host": "localhost", "user": "root", "passwd": "", "db": "test_pymysql2" } ] diff --git a/.travis.yml b/.travis.yml index 1b9a0d9..e337c46 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,19 +1,21 @@ sudo: false language: python -python: "3.4" +python: + - "3.6" + - "3.5" + - "3.4" + - "2.7" + - "pypy" + env: - - TOX_ENV=py27 - - TOX_ENV=py33 - - TOX_ENV=py34 - - TOX_ENV=pypy - - TOX_ENV=pypy3 + - PYTHONIOENCODING=utf-8 install: - - pip install -U tox + - pip install -U tornado before_script: - "mysql -e 'create database test_pymysql DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;'" - "mysql -e 'create database test_pymysql2 DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;'" - cp .travis.databases.json tornado_mysql/tests/databases.json -script: tox -e $TOX_ENV +script: ./runtests.py diff --git a/MANIFEST.in b/MANIFEST.in index c57b5ea..c2442c6 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,2 @@ include README.rst LICENSE CHANGELOG include runtests.py tox.ini -include example.py example_pool diff --git a/README.rst b/README.rst index 3262f46..6edfded 100644 --- a/README.rst +++ b/README.rst @@ -9,6 +9,24 @@ Tornado-MySQL This package contains a fork of PyMySQL supporting Tornado. + +WARNING +------- + +This library is experimental and unmaintained. Don't use for production unless you can fix problem yourself. + +If you think async is efficient, you're wrong. You shoud try thread pool before this. +See also: http://techspot.zzzeek.org/2015/02/15/asynchronous-python-and-databases/ + +I don't have motivation to maintain this library. I won't add new features. **Please don't send feature request.** +I'm very lazy about fix bugs. **Don't expect bugs are fixed when you want**. + +Instead, you should use your time and energy to port your project to asyncio and newest Python 3. +Please don't pay your time for this project. + +You can use aio-libs/aiomysql or ``run_in_executor()`` in asyncio. + + Example ------- diff --git a/example.py b/example/example.py similarity index 100% rename from example.py rename to example/example.py diff --git a/example_pool.py b/example/pool.py similarity index 100% rename from example_pool.py rename to example/pool.py diff --git a/example/pool2.py b/example/pool2.py new file mode 100644 index 0000000..f41313f --- /dev/null +++ b/example/pool2.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +from __future__ import print_function + +import random +from tornado import ioloop, gen +from tornado_mysql import pools + + +pools.DEBUG = True + + +POOL = pools.Pool( + dict(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql'), + max_idle_connections=2, + max_recycle_sec=3, + max_open_connections=5, +) + + +@gen.coroutine +def worker(n): + for i in range(20): + t = random.random() * 5 + print(n, "sleeping", t, "seconds") + cur = yield POOL.execute("SELECT SLEEP(%s)", (t,)) + print(n, cur.fetchall()) + yield gen.sleep(t) + + +@gen.coroutine +def main(): + workers = [worker(i) for i in range(10)] + yield workers + + +ioloop.IOLoop.current().run_sync(main) +print(POOL._opened_conns) + diff --git a/pymysql/tests/test_load_local.py b/pymysql/tests/test_load_local.py deleted file mode 100644 index 1115bb3..0000000 --- a/pymysql/tests/test_load_local.py +++ /dev/null @@ -1,68 +0,0 @@ -from pymysql import OperationalError, Warning -from pymysql.tests import base - -import os -import warnings - -__all__ = ["TestLoadLocal"] - - -class TestLoadLocal(base.PyMySQLTestCase): - def test_no_file(self): - """Test load local infile when the file does not exist""" - conn = self.connections[0] - c = conn.cursor() - c.execute("CREATE TABLE test_load_local (a INTEGER, b INTEGER)") - try: - self.assertRaises( - OperationalError, - c.execute, - ("LOAD DATA LOCAL INFILE 'no_data.txt' INTO TABLE " - "test_load_local fields terminated by ','") - ) - finally: - c.execute("DROP TABLE test_load_local") - c.close() - - def test_load_file(self): - """Test load local infile with a valid file""" - conn = self.connections[0] - c = conn.cursor() - c.execute("CREATE TABLE test_load_local (a INTEGER, b INTEGER)") - filename = os.path.join(os.path.dirname(os.path.realpath(__file__)), - 'data', - 'load_local_data.txt') - try: - c.execute( - ("LOAD DATA LOCAL INFILE '{0}' INTO TABLE " + - "test_load_local FIELDS TERMINATED BY ','").format(filename) - ) - c.execute("SELECT COUNT(*) FROM test_load_local") - self.assertEqual(22749, c.fetchone()[0]) - finally: - c.execute("DROP TABLE test_load_local") - - def test_load_warnings(self): - """Test load local infile produces the appropriate warnings""" - conn = self.connections[0] - c = conn.cursor() - c.execute("CREATE TABLE test_load_local (a INTEGER, b INTEGER)") - filename = os.path.join(os.path.dirname(os.path.realpath(__file__)), - 'data', - 'load_local_warn_data.txt') - try: - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always') - c.execute( - ("LOAD DATA LOCAL INFILE '{0}' INTO TABLE " + - "test_load_local FIELDS TERMINATED BY ','").format(filename) - ) - self.assertEqual(w[0].category, Warning) - self.assertTrue("Incorrect integer value" in str(w[-1].message)) - finally: - c.execute("DROP TABLE test_load_local") - - -if __name__ == "__main__": - import unittest - unittest.main() diff --git a/setup.py b/setup.py index b39f3d6..94d7a11 100755 --- a/setup.py +++ b/setup.py @@ -1,21 +1,14 @@ #!/usr/bin/env python from setuptools import setup, find_packages -try: - with open('README.rst') as f: - readme = f.read() -except IOError: - readme = '' - setup( name="Tornado-MySQL", - version="0.2", + version="0.5.1", url='https://github.com/PyMySQL/Tornado-MySQL', author='INADA Naoki', author_email='songofacandy@gmail.com', description='Pure Python MySQL Driver for Tornado', install_requires=['tornado>=4.0'], - long_description=readme, license="MIT", packages=find_packages(), classifiers=[ @@ -24,6 +17,7 @@ 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: Implementation :: CPython', 'Programming Language :: Python :: Implementation :: PyPy', 'Development Status :: 3 - Alpha', diff --git a/tornado_mysql/connections.py b/tornado_mysql/connections.py index c266ed1..ef7e0b9 100644 --- a/tornado_mysql/connections.py +++ b/tornado_mysql/connections.py @@ -322,9 +322,6 @@ def is_resultset_packet(self): field_count = ord(self._data[0:1]) return 1 <= field_count <= 250 - def is_load_local_packet(self): - return self._data[0:1] == b'\xfb' - def is_error_packet(self): return self._data[0:1] == b'\xff' @@ -437,26 +434,6 @@ def __getattr__(self, key): return getattr(self.packet, key) -class LoadLocalPacketWrapper(object): - """ - Load Local Packet Wrapper. It uses an existing packet object, and wraps - around it, exposing useful variables while still providing access - to the original packet objects variables and methods. - """ - - def __init__(self, from_packet): - if not from_packet.is_load_local_packet(): - raise ValueError( - "Cannot create '{0}' object from invalid packet type".format( - self.__class__)) - - self.packet = from_packet - self.filename = self.packet.get_all_data()[1:] - if DEBUG: print("filename=", self.filename) - - def __getattr__(self, key): - return getattr(self.packet, key) - class Connection(object): """ @@ -476,8 +453,7 @@ def __init__(self, host="localhost", user=None, password="", client_flag=0, cursorclass=Cursor, init_command=None, connect_timeout=None, ssl=None, read_default_group=None, compress=None, named_pipe=None, no_delay=False, - autocommit=False, db=None, passwd=None, local_infile=False, - io_loop=None): + autocommit=False, db=None, passwd=None, io_loop=None): """ Establish a connection to the MySQL database. Accepts several arguments: @@ -511,7 +487,6 @@ def __init__(self, host="localhost", user=None, password="", no_delay: Disable Nagle's algorithm on the socket autocommit: Autocommit mode. None means use server default. (default: False) io_loop: Tornado IOLoop - local_infile: Boolean to enable the use of LOAD DATA LOCAL command. (default: False) db: Alias for database. (for compatibility to MySQLdb) passwd: Alias for password. (for compatibility to MySQLdb) @@ -529,9 +504,6 @@ def __init__(self, host="localhost", user=None, password="", if compress or named_pipe: raise NotImplementedError("compress and named_pipe arguments are not supported") - if local_infile: - client_flag |= CLIENT.LOCAL_FILES - if ssl and ('capath' in ssl or 'cipher' in ssl): raise NotImplementedError('ssl options capath and cipher are not supported') @@ -624,6 +596,9 @@ def close(self): @gen.coroutine def close_async(self): """Send the quit message and close the socket""" + if self._stream is None or self._stream.closed(): + self._stream = None + return send_data = struct.pack(' Future[connection] now = self.io_loop.time() + + # Try to reuse in free pool while self._free_conn: conn = self._free_conn.popleft() if now - conn.connected_time > self.max_recycle_sec: self._close_async(conn) continue - _debug("Reusing connection from pool (opened=%d)" % (self._opened_conns,)) + log.debug("Reusing connection from pool: %s", self.stat()) fut = Future() fut.set_result(conn) return fut - self._opened_conns += 1 - _debug("Creating new connection (opened=%d)" % (self._opened_conns,)) - return connect(**self.connect_kwargs) + # Open new connection + if self.max_open == 0 or self._opened_conns < self.max_open: + self._opened_conns += 1 + log.debug("Creating new connection: %s", self.stat()) + fut = connect(**self.connect_kwargs) + fut.add_done_callback(self._on_connect) # self._opened_conns -=1 on exception + return fut + + # Wait to other connection is released. + fut = Future() + self._waitings.append(fut) + return fut + + def _on_connect(self, fut): + if fut.exception(): + self._opened_conns -= 1 def _put_conn(self, conn): - if (len(self._free_conn) < self.max_idle_connections and + if (len(self._free_conn) < self.max_idle and self.io_loop.time() - conn.connected_time < self.max_recycle_sec): - self._free_conn.append(conn) + if self._waitings: + fut = self._waitings.popleft() + fut.set_result(conn) + log.debug("Passing returned connection to waiter: %s", self.stat()) + else: + self._free_conn.append(conn) + log.debug("Add conn to free pool: %s", self.stat()) else: self._close_async(conn) def _close_async(self, conn): - self.io_loop.add_future(conn.close_async(), callback=lambda f: None) - self._opened_conns -= 1 + self.io_loop.add_future(conn.close_async(), callback=self._after_close) def _close_conn(self, conn): conn.close() - self._opened_conns -= 1 + self._after_close() + + def _after_close(self, fut=None): + if self._waitings: + fut = self._waitings.popleft() + conn = Connection(**self.connect_kwargs) + cf = conn.connect() + self.io_loop.add_future(cf, callback=lambda f: fut.set_result(conn)) + else: + self._opened_conns -= 1 + log.debug("Connection closed: %s", self.stat()) @coroutine - def execute(self, query, params=None): + def execute(self, query, params=None, cursor=None): """Execute query in pool. Returns future yielding closed cursor. You can get rows, lastrowid, etc from the cursor. + :param cursor: cursor class(Cursor, DictCursor. etc.) :return: Future of cursor :rtype: Future """ conn = yield self._get_conn() try: - cur = conn.cursor() + cur = conn.cursor(cursor) yield cur.execute(query, params) yield cur.close() - self._put_conn(conn) except: - self._opened_conns -= 1 - conn.close() + self._close_conn(conn) raise + else: + self._put_conn(conn) raise Return(cur) @coroutine @@ -111,6 +148,11 @@ def begin(self): :rtype: Future """ conn = yield self._get_conn() + try: + yield conn.begin() + except: + self._close_conn(conn) + raise trx = Transaction(self, conn) raise Return(trx) @@ -130,7 +172,7 @@ def _close(self): self._pool = self._conn = None @coroutine - def execute(self, query, args): + def execute(self, query, args=None): """ :return: Future[Cursor] :rtype: Future @@ -155,4 +197,5 @@ def rollback(self): def __del__(self): if self._pool is not None: warnings.warn("Transaction has not committed or rollbacked.") + log.warn("Transaction has not committed or rollbacked.") self._pool._close_conn(self._conn) diff --git a/tornado_mysql/tests/base.py b/tornado_mysql/tests/base.py index 58ae292..1eb3dcb 100644 --- a/tornado_mysql/tests/base.py +++ b/tornado_mysql/tests/base.py @@ -22,7 +22,7 @@ class PyMySQLTestCase(AsyncTestCase): else: databases = [ {"host":"localhost","user":"root", - "passwd":"","db":"test_pymysql", "use_unicode": True, 'local_infile': True}, + "passwd":"","db":"test_pymysql", "use_unicode": True}, {"host":"localhost","user":"root","passwd":"","db":"test_pymysql2"}] @gen.coroutine diff --git a/pymysql/tests/data/load_local_data.txt b/tornado_mysql/tests/data/load_local_data.txt similarity index 100% rename from pymysql/tests/data/load_local_data.txt rename to tornado_mysql/tests/data/load_local_data.txt diff --git a/pymysql/tests/data/load_local_warn_data.txt b/tornado_mysql/tests/data/load_local_warn_data.txt similarity index 100% rename from pymysql/tests/data/load_local_warn_data.txt rename to tornado_mysql/tests/data/load_local_warn_data.txt diff --git a/pymysql/tests/test_cursor.py b/tornado_mysql/tests/test_cursor.py similarity index 100% rename from pymysql/tests/test_cursor.py rename to tornado_mysql/tests/test_cursor.py