diff --git a/.gitignore b/.gitignore index 342a348..e6b16b7 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,25 @@ pylint_report.txt .tox/* .pytest_cache/* .python-version + # Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + # C extensions +*.so + # Distribution / packaging +.Python + # Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +\.vscode/settings\.json + +\.vscode/ diff --git a/.python-version b/.python-version deleted file mode 100644 index ecc17b8..0000000 --- a/.python-version +++ /dev/null @@ -1 +0,0 @@ -2.7.13 diff --git a/.talismanrc b/.talismanrc new file mode 100644 index 0000000..5182285 --- /dev/null +++ b/.talismanrc @@ -0,0 +1,4 @@ +fileignoreconfig: +- filename: .travis.yml + checksum: 1f8ad739036010977c7663abd2a9348f00e7a25424f550f7d0cfc26423e667e5 + ignore_detectors: [] diff --git a/.travis.yml b/.travis.yml index 0e9beaf..8ba408d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,17 +1,35 @@ language: python sudo: required +env: + global: + CODECOV_TOKEN: + secure: ZXkVg6JLVPav6OJPzfUgIGD64e85N92tgpXA2nymIHfucCTGC5B4yniCTD49jz6xET1m1qRb04lvomCiQ7PeDoBW/LgJLIjXyJOW+2iA9VA3MdxwQF8Teu5W1J34wH4dm6nfn0KCNxhYRDTBkDgvXeUcXP5nNlo9w3sL+FKbAdXucwlL8ytcJXf641YhuGQpIrh1XGioBSNJ54BTRDXQXcRW76XaltxHCsbEv+fLH4yuahJdCTGjsr9cGdygAlo3FcLsqgkcjFCoNjg5UgBcPN8QPfWAppeIrmLRCq/q+p5KH2awPYqH0BL8jdTTmFElyGLmQBNnB6R5tI5HIx7+OCsw79mhXPVRgn3xkRj0OWRjzYlA+vW8JM2rEepixs9CRWtZJdC72oe1aytFb2cyVDfmotLwyuUqFI2ieQkyHgj0OLl1n1tcicRo8eS5RIB8mYicCm29lDrs/J6TFWSl/VqNUtZU+y54I/lv/fiFbRVjtMZ+PdwGHoigaVaIKeWe1TmlGWun7bh4Ov3jz62WtBlvuhz3LHMYD8OIuijot0HHqWsC1mlmZUvKoeSDYFXNLBBSAwMkkAfxxQM0PhMG3qUUirkd5xPJyBh1n8d4/KQzrkTblI7QzZgCwJE1r1L99XMs2/Ugf65gfTxRYFOMZGZUi0rzvXlu0Z7P5VrQr6c= + CC_TEST_REPORTER_ID: + secure: IceCOfujcdUwsTsg1328sbrvO/33N39/9pHxG/1VkMpqt47WDlG1lxbQGV78WuK7exip/JaDcB+iWPNJbyxGirOK0Co64O61iZcKUEbH6wjWxjeZ2yuhpyxyrnUF9OWmk8op4ewkU2ww6tzXQT2Lo+b/g8ryTFag0o8roA9unCj5p42aywZ927UIagaVqQh0sJ/qUUCmwAvGIB8bqKL8nxg97PwgBy38mH5PWE3Bqkm0FBpreKb1x4m4n9wZE29noiImT0xEIZMCwZ4zUPzbpKQmdUe1tHWf0hoQuVPWHLCMwqU2AW/PiY3CqlaAiUX71450WaKDrjbBtUvDl73YaUdiroWoL4rrm2UjlNGFbpEoqEbdBn2HLEefCw+zoo8YEPxieXVUQgmRygGpgoHTrRFqkReLA2BxV6F7IeMZ2AtOW0OXejzjcOEBWnRFs2sF6EqQZL8decye3P5CPcKVzNg28QEBBtdYgYT02qlY8JFv8N6KU9qNMjMvT9yQU8lfbV0iteMtdZl4coinNR34hNf9jMY+uj3/44kHgooygur/A9tHgQt/9/VTpS//y79gG6+ozllwzFQjzWE1AqLUPnPtSJZpvF8F5mmnww/sf/pjsV7jvA9VwyF/paO2JicGIN+bw86FNydXRHP3mmEAfJXOBiJVr5xPD2Wi1Q8Dw3k= services: - - docker +- docker python: - - "2.7" - - "3.6" - - "nightly" -# command to install dependencies +- '2.7' +- '3.6' +- nightly install: - - docker-compose up -d - - pip install . - - pip install -r requirements-test.txt - - docker exec -it wpapipython_woocommerce_1 bash -c 'until [ -f .done ]; do sleep 1; done; echo "complete"' -# command to run tests +- docker-compose up -d +- pip install . +- pip install -r requirements-test.txt +- docker exec -it wpapipython_woocommerce_1 bash -c 'until [ -f .done ]; do sleep + 1; done; echo "complete"' +before_script: +- curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 > ./cc-test-reporter +- chmod +x ./cc-test-reporter +- "./cc-test-reporter before-build" script: - - pytest +- py.test --cov=wordpress tests +after_success: +- codecov +- "./cc-test-reporter after-build --exit-code $TRAVIS_TEST_RESULT --debug" +deploy: + provider: pypi + user: derwents + skip_existing: true + password: + secure: Hj0CU4CzrA1WhZ/lHW7sDYXjmYSBnEfooRPwgoIMbjk+1G1THNk5A0efR6Vmjh5Ugm8vejHRRCUuYY6UwFE86KbGRUCModx/9rSeoLSKJ1DPIryHNRXgDDJGx2zt8fbvOwnzY7vFcvGDnN65EkGcZxekdwi4kNGQgp54vCQzjvf+dxo4A9+YuHPl8JzQekHQP3uU4VIMGgKSNpMfCdTraBzO8rAX0EuBTEEjczDJn3X77D/vSgqgA6G8oU0tcgnpd9ryydSHoc1icU70D0NUvHTLRr/RNVfIk+QVEcvcFedg4Wo81rcXxta1UQTmyIIZGBGNfNo/68o9GqsD0Edc4oHfZkMqainknaKfke8LtqdEEkOSwuhHX4NdzwBnMCV5rMud2W42hikiJKEPy3cGZJjiabUmEG8IpI1EsiMz5zCod/OVPGq87n4towvnsIpIzyGC7JkbSxHn+NYASkIkX38lhiPzYpgW7VZXRAUPebkxW8P2Bmyx0A8Sli2ijAkx04ul6jk1/HGCB8Y4EtQ9GuefH5pwfV1fhb2lEf56Dyd+REdZia/jiU+dKoPXYv+ZmM1ynrQVwn1/ZCHZejLOzhCGR5Dxk2yjT51hKPHcNzKboR++XiiKML1/cPSTDcGSamADazKuLMqJkP+CWRPkwts+tKBOka0YLCVJwD4ZFgU= diff --git a/README.rst b/README.rst index 3029342..179d2c7 100644 --- a/README.rst +++ b/README.rst @@ -1,7 +1,28 @@ +**A note from the author:** I no longer do Wordpress work, so I won't have the time to adequately maintain this repo. If you would like to maintain a fork of this repo, and want me to link to your fork here, please `let me know `_. + +One such fork is https://github.com/Synoptik-Labs/wp-api-python + +thanks! + Wordpress API - Python Client =============================== -[![Build Status](https://travis-ci.org/derwentx/wp-api-python.svg?branch=master)](https://travis-ci.org/derwentx/wp-api-python) +.. image:: https://travis-ci.org/derwentx/wp-api-python.svg?branch=master + :target: https://travis-ci.org/derwentx/wp-api-python + +.. image:: https://api.codeclimate.com/v1/badges/4df627621037b2df7e5d/maintainability + :target: https://codeclimate.com/github/derwentx/wp-api-python/maintainability + :alt: Maintainability + +.. image:: https://api.codeclimate.com/v1/badges/4df627621037b2df7e5d/test_coverage + :target: https://codeclimate.com/github/derwentx/wp-api-python/test_coverage + :alt: Test Coverage + +.. image:: https://snyk.io/test/github/derwentx/wp-api-python/badge.svg?targetFile=requirements.txt + :target: https://snyk.io/test/github/derwentx/wp-api-python?targetFile=requirements.txt + +.. image:: https://badge.fury.io/py/wordpress-api.svg + :target: https://badge.fury.io/py/wordpress-api A Python wrapper for the Wordpress and WooCommerce REST APIs with oAuth1a 3leg support. @@ -35,6 +56,7 @@ You should have the following plugins installed on your wordpress site: - **WP REST API** (only required for WP < v4.7, recommended version: 2.0+) - **WP REST API - OAuth 1.0a Server** (optional, if you want oauth within the wordpress API. https://github.com/WP-API/OAuth1) - **WP REST API - Meta Endpoints** (optional) +- **WP API Basic Auth** https://github.com/WP-API/Basic-Auth (for image uploading) - **WooCommerce** (optional, if you want to use the WooCommerce API) The following python packages are also used by the package @@ -62,12 +84,21 @@ Download this repo and use setuptools to install the package Testing ------- -If you have installed from source, then you can test with unittest: +Some of the tests make API calls to a dockerized woocommerce container. Don't +worry! It's really simple to set up. You just need to install docker and run + +.. code-block:: bash + + docker-compose up -d + # this just waits until the docker container is set up and exits + docker exec -it wpapipython_woocommerce_1 bash -c 'until [ -f .done ]; do sleep 1; done; echo "complete"' + +Then you can test with: .. code-block:: bash pip install -r requirements-test.txt - python -m unittest -v tests + python setup.py test Publishing ---------- @@ -237,7 +268,7 @@ OPTIONS Upload an image ----- -(Note: this only works on WP API with basic auth) +(Note: this only works on WP API with the Basic Auth plugin enabled: https://github.com/WP-API/Basic-Auth ) .. code-block:: python @@ -250,8 +281,8 @@ Upload an image 'content-disposition': 'attachment; filename=%s' % filename, 'content-type': 'image/%s' % extension } - return wcapi.post(self.endpoint_singular, data, headers=headers) - + endpoint = "/media" + return wpapi.post(endpoint, data, headers=headers) Response -------- @@ -287,10 +318,25 @@ According the the [documentation](https://developer.wordpress.org/rest-api/refer >>> response.json() {“deleted”:true, ... } +A Note on Encoding +==== + +In Python2, make sure to only `POST` unicode string objects or strings that +have been correctly encoded as utf-8. Serializing objects containing non-utf8 +byte strings in Python2 is broken by importing `unicode_literals` from +`__future__` because of a bug in `json.dumps`. You may be able to get around +this problem by serializing the data yourself. + Changelog --------- +1.2.8 - 2018/10/13 +~~~~~~~~~~~~~~~~~~ +- Much better python3 support +- really good tests +- added NoAuth option for adding custom headers (like JWT) + 1.2.7 - 2018/06/18 ~~~~~~~~~~~~~~~~~~ - Don't crash on "-1" response from API. diff --git a/docker-compose.yml b/docker-compose.yml index 1687292..4dc59e1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,5 @@ version: "2" -services: +services: db: image: mariadb environment: @@ -7,7 +7,7 @@ services: MYSQL_DATABASE: "wordpress" MYSQL_ROOT_PASSWORD: "" ports: - - "8081:3306" + - "8082:3306" woocommerce: image: derwentx/woocommerce-api environment: @@ -21,10 +21,17 @@ services: WORDPRESS_ADMIN_PASSWORD: "admin" WORDPRESS_ADMIN_EMAIL: "admin@example.com" WORDPRESS_DEBUG: 1 - WOOCOMMERCE_TEST_DATA: 1 + WORDPRESS_PLUGINS: "https://github.com/WP-API/Basic-Auth/archive/master.zip" + WORDPRESS_API_APPLICATION: "Test" + WORDPRESS_API_DESCRIPTION: "Test Application" + WORDPRESS_API_CALLBACK: "http://127.0.0.1/oauth1_callback" + WORDPRESS_API_KEY: "tYG1tAoqjBEM" + WORDPRESS_API_SECRET: "s91fvylVrqChwzzDbEJHEWyySYtAmlIsqqYdjka1KyVDdAyB" + WOOCOMMERCE_TEST_DATA: "1" + WOOCOMMERCE_TEST_DATA_URL: "https://raw.githubusercontent.com/woocommerce/woocommerce/c81b3cf1655f9983db37bff750cb5baae3c3236e/dummy-data/dummy-products.xml" WOOCOMMERCE_CONSUMER_KEY: "ck_659f6994ae88fed68897f9977298b0e19947979a" WOOCOMMERCE_CONSUMER_SECRET: "cs_9421d39290f966172fef64ae18784a2dc7b20976" - links: + links: - db:mysql ports: - "8083:80" diff --git a/requirements-test.txt b/requirements-test.txt index ced1398..a6d0ee5 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,6 @@ -r requirements.txt -httmock==1.2.3 -nose==1.3.7 -six +httmock pytest +pytest-cov<2.6.0 +coverage +codecov diff --git a/requirements.txt b/requirements.txt index b7f329c..da0aed0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,10 @@ -requests==2.7.0 -ordereddict==1.1 -bs4 six +ordereddict requests_oauthlib +pathlib2 +funcsigs +requests +more_itertools +colorama +beautifulsoup4 +urllib3>=1.24.3 diff --git a/setup.cfg b/setup.cfg index f35a17e..224224d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,7 +2,7 @@ test=pytest [tool:pytest] addopts = --verbose -python_files = tests.py +python_files = tests/test_*.py [pylama] skip=\.*,build/*,dist/*,*.egg-info [pylama:tests.py] diff --git a/setup.py b/setup.py index 002e762..8c97734 100644 --- a/setup.py +++ b/setup.py @@ -11,13 +11,16 @@ # Get version from __init__.py file VERSION = "" with open("wordpress/__init__.py", "r") as fd: - VERSION = re.search(r"^__version__\s*=\s*['\"]([^\"]*)['\"]", fd.read(), re.MULTILINE).group(1) + VERSION = re.search( + r"^__version__\s*=\s*['\"]([^\"]*)['\"]", fd.read(), re.MULTILINE + ).group(1) if not VERSION: raise RuntimeError("Cannot find version information") # Get long description -README = open(os.path.join(os.path.dirname(__file__), "README.rst"), encoding="utf8").read() +README = open(os.path.join(os.path.dirname(__file__), + "README.rst"), encoding="utf8").read() # allow setup.py to be run from any path os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir))) @@ -40,9 +43,11 @@ platforms=['any'], install_requires=[ "requests", + "requests_oauthlib", "ordereddict", "beautifulsoup4", - 'lxml' + 'lxml', + 'six', ], setup_requires=[ 'pytest-runner', diff --git a/tests.py b/tests.py deleted file mode 100644 index 00f90c4..0000000 --- a/tests.py +++ /dev/null @@ -1,1113 +0,0 @@ -""" API Tests """ -import functools -import logging -import pdb -import platform -import random -import sys -import traceback -import six -import unittest -from collections import OrderedDict -from copy import copy -from tempfile import mkstemp -from time import time - -import wordpress -from httmock import HTTMock, all_requests, urlmatch -from six import text_type, u -from wordpress import __default_api__, __default_api_version__, auth -from wordpress.api import API -from wordpress.auth import Auth, OAuth -from wordpress.helpers import SeqUtils, StrUtils, UrlUtils -from wordpress.transport import API_Requests_Wrapper - -try: - from urllib.parse import urlencode, quote, unquote, parse_qs, parse_qsl, urlparse, urlunparse - from urllib.parse import ParseResult as URLParseResult -except ImportError: - from urllib import urlencode, quote, unquote - from urlparse import parse_qs, parse_qsl, urlparse, urlunparse - from urlparse import ParseResult as URLParseResult - -def debug_on(*exceptions): - if not exceptions: - exceptions = (AssertionError, ) - def decorator(f): - @functools.wraps(f) - def wrapper(*args, **kwargs): - prev_root = copy(logging.root) - try: - logging.basicConfig(level=logging.DEBUG) - return f(*args, **kwargs) - except exceptions: - info = sys.exc_info() - traceback.print_exception(*info) - pdb.post_mortem(info[2]) - finally: - logging.root = prev_root - return wrapper - return decorator - -CURRENT_TIMESTAMP = int(time()) -SHITTY_NONCE = "" - -class WordpressTestCase(unittest.TestCase): - """Test case for the client methods.""" - - def setUp(self): - self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - self.api = wordpress.API( - url="http://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret - ) - - def test_api(self): - """ Test default API """ - api = wordpress.API( - url="https://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret - ) - - self.assertEqual(api.namespace, __default_api__) - - def test_version(self): - """ Test default version """ - api = wordpress.API( - url="https://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret - ) - - self.assertEqual(api.version, __default_api_version__) - - def test_non_ssl(self): - """ Test non-ssl """ - api = wordpress.API( - url="http://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret - ) - self.assertFalse(api.is_ssl) - - def test_with_ssl(self): - """ Test non-ssl """ - api = wordpress.API( - url="https://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret - ) - self.assertTrue(api.is_ssl, True) - - def test_with_timeout(self): - """ Test non-ssl """ - api = wordpress.API( - url="https://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret, - timeout=10, - ) - self.assertEqual(api.timeout, 10) - - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 200, - 'content': b'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = api.get("products").status_code - self.assertEqual(status, 200) - - - def test_get(self): - """ Test GET requests """ - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 200, - 'content': b'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = self.api.get("products").status_code - self.assertEqual(status, 200) - - def test_post(self): - """ Test POST requests """ - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 201, - 'content': b'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = self.api.post("products", {}).status_code - self.assertEqual(status, 201) - - def test_put(self): - """ Test PUT requests """ - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 200, - 'content': b'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = self.api.put("products", {}).status_code - self.assertEqual(status, 200) - - def test_delete(self): - """ Test DELETE requests """ - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 200, - 'content': b'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = self.api.delete("products").status_code - self.assertEqual(status, 200) - - # @unittest.skip("going by RRC 5849 sorting instead") - def test_oauth_sorted_params(self): - """ Test order of parameters for OAuth signature """ - def check_sorted(keys, expected): - params = auth.OrderedDict() - for key in keys: - params[key] = '' - - params = UrlUtils.sorted_params(params) - ordered = [key for key, value in params] - self.assertEqual(ordered, expected) - - check_sorted(['a', 'b'], ['a', 'b']) - check_sorted(['b', 'a'], ['a', 'b']) - check_sorted(['a', 'b[a]', 'b[b]', 'b[c]', 'c'], ['a', 'b[a]', 'b[b]', 'b[c]', 'c']) - check_sorted(['a', 'b[c]', 'b[a]', 'b[b]', 'c'], ['a', 'b[c]', 'b[a]', 'b[b]', 'c']) - check_sorted(['d', 'b[c]', 'b[a]', 'b[b]', 'c'], ['b[c]', 'b[a]', 'b[b]', 'c', 'd']) - check_sorted(['a1', 'b[c]', 'b[a]', 'b[b]', 'a2'], ['a1', 'a2', 'b[c]', 'b[a]', 'b[b]']) - -class HelperTestcase(unittest.TestCase): - def setUp(self): - self.test_url = "http://ich.local:8888/woocommerce/wc-api/v3/products?filter%5Blimit%5D=2&oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&oauth_nonce=c4f2920b0213c43f2e8d3d3333168ec4a22222d1&oauth_signature=3ibOjMuhj6JGnI43BQZGniigHh8%3D&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1481601370&page=2" - - - def test_url_is_ssl(self): - self.assertTrue(UrlUtils.is_ssl("https://woo.test:8888")) - self.assertFalse(UrlUtils.is_ssl("http://woo.test:8888")) - - def test_url_substitute_query(self): - self.assertEqual( - UrlUtils.substitute_query("https://woo.test:8888/sdf?param=value", "newparam=newvalue"), - "https://woo.test:8888/sdf?newparam=newvalue" - ) - self.assertEqual( - UrlUtils.substitute_query("https://woo.test:8888/sdf?param=value"), - "https://woo.test:8888/sdf" - ) - self.assertEqual( - UrlUtils.substitute_query( - "https://woo.test:8888/sdf?param=value", - "newparam=newvalue&othernewparam=othernewvalue" - ), - "https://woo.test:8888/sdf?newparam=newvalue&othernewparam=othernewvalue" - ) - self.assertEqual( - UrlUtils.substitute_query( - "https://woo.test:8888/sdf?param=value", - "newparam=newvalue&othernewparam=othernewvalue" - ), - "https://woo.test:8888/sdf?newparam=newvalue&othernewparam=othernewvalue" - ) - - def test_url_add_query(self): - self.assertEqual( - "https://woo.test:8888/sdf?param=value&newparam=newvalue", - UrlUtils.add_query("https://woo.test:8888/sdf?param=value", 'newparam', 'newvalue') - ) - - def test_url_join_components(self): - self.assertEqual( - 'https://woo.test:8888/wp-json', - UrlUtils.join_components(['https://woo.test:8888/', '', 'wp-json']) - ) - self.assertEqual( - 'https://woo.test:8888/wp-json/wp/v2', - UrlUtils.join_components(['https://woo.test:8888/', 'wp-json', 'wp/v2']) - ) - - def test_url_get_php_value(self): - self.assertEqual( - '1', - UrlUtils.get_value_like_as_php(True) - ) - self.assertEqual( - '', - UrlUtils.get_value_like_as_php(False) - ) - self.assertEqual( - 'asd', - UrlUtils.get_value_like_as_php('asd') - ) - self.assertEqual( - '1', - UrlUtils.get_value_like_as_php(1) - ) - self.assertEqual( - '1', - UrlUtils.get_value_like_as_php(1.0) - ) - self.assertEqual( - '1.1', - UrlUtils.get_value_like_as_php(1.1) - ) - - def test_url_get_query_dict_singular(self): - result = UrlUtils.get_query_dict_singular(self.test_url) - self.assertEquals( - result, - { - 'filter[limit]': '2', - 'oauth_nonce': 'c4f2920b0213c43f2e8d3d3333168ec4a22222d1', - 'oauth_timestamp': '1481601370', - 'oauth_consumer_key': 'ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', - 'oauth_signature_method': 'HMAC-SHA1', - 'oauth_signature': '3ibOjMuhj6JGnI43BQZGniigHh8=', - 'page': '2' - } - ) - - def test_url_get_query_singular(self): - result = UrlUtils.get_query_singular(self.test_url, 'oauth_consumer_key') - self.assertEqual( - result, - 'ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' - ) - result = UrlUtils.get_query_singular(self.test_url, 'filter[limit]') - self.assertEqual( - text_type(result), - text_type(2) - ) - - def test_url_set_query_singular(self): - result = UrlUtils.set_query_singular(self.test_url, 'filter[limit]', 3) - expected = "http://ich.local:8888/woocommerce/wc-api/v3/products?filter%5Blimit%5D=3&oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&oauth_nonce=c4f2920b0213c43f2e8d3d3333168ec4a22222d1&oauth_signature=3ibOjMuhj6JGnI43BQZGniigHh8%3D&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1481601370&page=2" - self.assertEqual(result, expected) - - def test_url_del_query_singular(self): - result = UrlUtils.del_query_singular(self.test_url, 'filter[limit]') - expected = "http://ich.local:8888/woocommerce/wc-api/v3/products?oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&oauth_nonce=c4f2920b0213c43f2e8d3d3333168ec4a22222d1&oauth_signature=3ibOjMuhj6JGnI43BQZGniigHh8%3D&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1481601370&page=2" - self.assertEqual(result, expected) - - def test_url_remove_default_port(self): - self.assertEqual( - UrlUtils.remove_default_port('http://www.gooogle.com:80/'), - 'http://www.gooogle.com/' - ) - self.assertEqual( - UrlUtils.remove_default_port('http://www.gooogle.com:18080/'), - 'http://www.gooogle.com:18080/' - ) - - def test_seq_filter_true(self): - self.assertEquals( - ['a', 'b', 'c', 'd'], - SeqUtils.filter_true([None, 'a', False, 'b', 'c','d']) - ) - - def test_str_remove_tail(self): - self.assertEqual( - 'sdf', - StrUtils.remove_tail('sdf/','/') - ) - - def test_str_remove_head(self): - self.assertEqual( - 'sdf', - StrUtils.remove_head('/sdf', '/') - ) - - self.assertEqual( - 'sdf', - StrUtils.decapitate('sdf', '/') - ) - -class TransportTestcases(unittest.TestCase): - def setUp(self): - self.requester = API_Requests_Wrapper( - url='https://woo.test:8888/', - api='wp-json', - api_version='wp/v2' - ) - - def test_api_url(self): - self.assertEqual( - 'https://woo.test:8888/wp-json', - self.requester.api_url - ) - - def test_endpoint_url(self): - self.assertEqual( - 'https://woo.test:8888/wp-json/wp/v2/posts', - self.requester.endpoint_url('posts') - ) - - def test_request(self): - - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 200, - 'content': b'OK'} - - with HTTMock(woo_test_mock): - # call requests - response = self.requester.request("GET", "https://woo.test:8888/wp-json/wp/v2/posts") - self.assertEqual(response.status_code, 200) - self.assertEqual(response.request.url, 'https://woo.test:8888/wp-json/wp/v2/posts') - -class BasicAuthTestcases(unittest.TestCase): - def setUp(self): - self.base_url = "http://localhost:8888/wp-api/" - self.api_name = 'wc-api' - self.api_ver = 'v3' - self.endpoint = 'products/26' - self.signature_method = "HMAC-SHA1" - - self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - self.api_params = dict( - url=self.base_url, - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret, - basic_auth=True, - api=self.api_name, - version=self.api_ver, - query_string_auth=False, - ) - - def test_endpoint_url(self): - api = API( - **self.api_params - ) - endpoint_url = api.requester.endpoint_url(self.endpoint) - endpoint_url = api.auth.get_auth_url(endpoint_url, 'GET') - self.assertEqual( - endpoint_url, - UrlUtils.join_components([ - self.base_url, self.api_name, self.api_ver, self.endpoint - ]) - ) - - def test_query_string_endpoint_url(self): - query_string_api_params = dict(**self.api_params) - query_string_api_params.update(dict(query_string_auth=True)) - api = API( - **query_string_api_params - ) - endpoint_url = api.requester.endpoint_url(self.endpoint) - endpoint_url = api.auth.get_auth_url(endpoint_url, 'GET') - expected_endpoint_url = '%s?consumer_key=%s&consumer_secret=%s' % (self.endpoint, self.consumer_key, self.consumer_secret) - expected_endpoint_url = UrlUtils.join_components([self.base_url, self.api_name, self.api_ver, expected_endpoint_url]) - self.assertEqual( - endpoint_url, - expected_endpoint_url - ) - endpoint_url = api.requester.endpoint_url(self.endpoint) - endpoint_url = api.auth.get_auth_url(endpoint_url, 'GET') - - -class OAuthTestcases(unittest.TestCase): - - - def setUp(self): - self.base_url = "http://localhost:8888/wordpress/" - self.api_name = 'wc-api' - self.api_ver = 'v3' - self.endpoint = 'products/99' - self.signature_method = "HMAC-SHA1" - self.consumer_key = "ck_681c2be361e415519dce4b65ee981682cda78bc6" - self.consumer_secret = "cs_b11f652c39a0afd3752fc7bb0c56d60d58da5877" - - self.wcapi = API( - url=self.base_url, - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret, - api=self.api_name, - version=self.api_ver, - signature_method=self.signature_method - ) - - # RFC EXAMPLE 1 DATA: https://tools.ietf.org/html/draft-hammer-oauth-10#section-1.2 - - self.rfc1_api_url = 'https://photos.example.net/' - self.rfc1_consumer_key = 'dpf43f3p2l4k3l03' - self.rfc1_consumer_secret = 'kd94hf93k423kf44' - self.rfc1_oauth_token = 'hh5s93j4hdidpola' - self.rfc1_signature_method = 'HMAC-SHA1' - self.rfc1_callback = 'http://printer.example.com/ready' - self.rfc1_api = API( - url=self.rfc1_api_url, - consumer_key=self.rfc1_consumer_key, - consumer_secret=self.rfc1_consumer_secret, - api='', - version='', - callback=self.rfc1_callback, - wp_user='', - wp_pass='', - oauth1a_3leg=True - ) - self.rfc1_request_method = 'POST' - self.rfc1_request_target_url = 'https://photos.example.net/initiate' - self.rfc1_request_timestamp = '137131200' - self.rfc1_request_nonce = 'wIjqoS' - self.rfc1_request_params = [ - ('oauth_consumer_key', self.rfc1_consumer_key), - ('oauth_signature_method', self.rfc1_signature_method), - ('oauth_timestamp', self.rfc1_request_timestamp), - ('oauth_nonce', self.rfc1_request_nonce), - ('oauth_callback', self.rfc1_callback), - ] - self.rfc1_request_signature = b'74KNZJeDHnMBp0EMJ9ZHt/XKycU=' - - - # # RFC EXAMPLE 3 DATA: https://tools.ietf.org/html/draft-hammer-oauth-10#section-3.4.1 - # self.rfc3_method = "GET" - # self.rfc3_target_url = 'http://example.com/request' - # self.rfc3_params_raw = [ - # ('b5', r"=%3D"), - # ('a3', "a"), - # ('c@', ""), - # ('a2', 'r b'), - # ('oauth_consumer_key', '9djdj82h48djs9d2'), - # ('oauth_token', 'kkk9d7dh3k39sjv7'), - # ('oauth_signature_method', 'HMAC-SHA1'), - # ('oauth_timestamp', 137131201), - # ('oauth_nonce', '7d8f3e4a'), - # ('c2', ''), - # ('a3', '2 q') - # ] - # self.rfc3_params_encoded = [ - # ('b5', r"%3D%253D"), - # ('a3', "a"), - # ('c%40', ""), - # ('a2', r"r%20b"), - # ('oauth_consumer_key', '9djdj82h48djs9d2'), - # ('oauth_token', 'kkk9d7dh3k39sjv7'), - # ('oauth_signature_method', 'HMAC-SHA1'), - # ('oauth_timestamp', '137131201'), - # ('oauth_nonce', '7d8f3e4a'), - # ('c2', ''), - # ('a3', r"2%20q") - # ] - # self.rfc3_params_sorted = [ - # ('a2', r"r%20b"), - # # ('a3', r"2%20q"), # disallow multiple - # ('a3', "a"), - # ('b5', r"%3D%253D"), - # ('c%40', ""), - # ('c2', ''), - # ('oauth_consumer_key', '9djdj82h48djs9d2'), - # ('oauth_nonce', '7d8f3e4a'), - # ('oauth_signature_method', 'HMAC-SHA1'), - # ('oauth_timestamp', '137131201'), - # ('oauth_token', 'kkk9d7dh3k39sjv7'), - # ] - # self.rfc3_param_string = r"a2=r%20b&a3=2%20q&a3=a&b5=%3D%253D&c%40=&c2=&oauth_consumer_key=9djdj82h48djs9d2&oauth_nonce=7d8f3e4a&oauth_signature_method=HMAC-SHA1&oauth_timestamp=137131201&oauth_token=kkk9d7dh3k39sjv7" - # self.rfc3_base_string = r"GET&http%3A%2F%2Fexample.com%2Frequest&a2%3Dr%2520b%26a3%3D2%2520q%26a3%3Da%26b5%3D%253D%25253D%26c%2540%3D%26c2%3D%26oauth_consumer_key%3D9djdj82h48djs9d2%26oauth_nonce%3D7d8f3e4a%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D137131201%26oauth_token%3Dkkk9d7dh3k39sjv7" - - # test data taken from : https://dev.twitter.com/oauth/overview/creating-signatures - - self.twitter_api_url = "https://api.twitter.com/" - self.twitter_consumer_secret = "kAcSOqF21Fu85e7zjz7ZN2U4ZRhfV3WpwPAoE3Z7kBw" - self.twitter_consumer_key = "xvz1evFS4wEEPTGEFPHBog" - self.twitter_signature_method = "HMAC-SHA1" - self.twitter_api = API( - url=self.twitter_api_url, - consumer_key=self.twitter_consumer_key, - consumer_secret=self.twitter_consumer_secret, - api='', - version='1', - signature_method=self.twitter_signature_method, - ) - - self.twitter_method = "POST" - self.twitter_target_url = "https://api.twitter.com/1/statuses/update.json?include_entities=true" - self.twitter_params_raw = [ - ("status", "Hello Ladies + Gentlemen, a signed OAuth request!"), - ("include_entities", "true"), - ("oauth_consumer_key", self.twitter_consumer_key), - ("oauth_nonce", "kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg"), - ("oauth_signature_method", self.twitter_signature_method), - ("oauth_timestamp", "1318622958"), - ("oauth_token", "370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb"), - ("oauth_version", "1.0"), - ] - self.twitter_param_string = r"include_entities=true&oauth_consumer_key=xvz1evFS4wEEPTGEFPHBog&oauth_nonce=kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1318622958&oauth_token=370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb&oauth_version=1.0&status=Hello%20Ladies%20%2B%20Gentlemen%2C%20a%20signed%20OAuth%20request%21" - self.twitter_signature_base_string = r"POST&https%3A%2F%2Fapi.twitter.com%2F1%2Fstatuses%2Fupdate.json&include_entities%3Dtrue%26oauth_consumer_key%3Dxvz1evFS4wEEPTGEFPHBog%26oauth_nonce%3DkYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1318622958%26oauth_token%3D370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb%26oauth_version%3D1.0%26status%3DHello%2520Ladies%2520%252B%2520Gentlemen%252C%2520a%2520signed%2520OAuth%2520request%2521" - self.twitter_token_secret = 'LswwdoUaIvS8ltyTt5jkRh4J50vUPVVHtR2YPi5kE' - self.twitter_signing_key = 'kAcSOqF21Fu85e7zjz7ZN2U4ZRhfV3WpwPAoE3Z7kBw&LswwdoUaIvS8ltyTt5jkRh4J50vUPVVHtR2YPi5kE' - self.twitter_oauth_signature = b'tnnArxj06cWHq44gCs1OSKk/jLY=' - - self.lexev_consumer_key='your_app_key' - self.lexev_consumer_secret='your_app_secret' - self.lexev_callback='http://127.0.0.1/oauth1_callback' - self.lexev_signature_method='HMAC-SHA1' - self.lexev_version='1.0' - self.lexev_api = API( - url='https://bitbucket.org/', - api='api', - version='1.0', - consumer_key=self.lexev_consumer_key, - consumer_secret=self.lexev_consumer_secret, - signature_method=self.lexev_signature_method, - callback=self.lexev_callback, - wp_user='', - wp_pass='', - oauth1a_3leg=True - ) - self.lexev_request_method='POST' - self.lexev_request_url='https://bitbucket.org/api/1.0/oauth/request_token' - self.lexev_request_nonce='27718007815082439851427366369' - self.lexev_request_timestamp='1427366369' - self.lexev_request_params=[ - ('oauth_callback',self.lexev_callback), - ('oauth_consumer_key',self.lexev_consumer_key), - ('oauth_nonce',self.lexev_request_nonce), - ('oauth_signature_method',self.lexev_signature_method), - ('oauth_timestamp',self.lexev_request_timestamp), - ('oauth_version',self.lexev_version), - ] - self.lexev_request_signature=b"iPdHNIu4NGOjuXZ+YCdPWaRwvJY=" - self.lexev_resource_url='https://api.bitbucket.org/1.0/repositories/st4lk/django-articles-transmeta/branches' - - # def test_get_sign(self): - # message = "POST&http%3A%2F%2Flocalhost%3A8888%2Fwordpress%2Foauth1%2Frequest&oauth_callback%3Dlocalhost%253A8888%252Fwordpress%26oauth_consumer_key%3DLCLwTOfxoXGh%26oauth_nonce%3D85285179173071287531477036693%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1477036693%26oauth_version%3D1.0" - # signature_method = 'HMAC-SHA1' - # sig_key = 'k7zLzO3mF75Xj65uThpAnNvQHpghp4X1h5N20O8hCbz2kfJq&' - # sig = OAuth.get_sign(message, signature_method, sig_key) - # expected_sig = '8T93S/PDOrEd+N9cm84EDvsPGJ4=' - # self.assertEqual(sig, expected_sig) - - def test_get_sign_key(self): - self.assertEqual( - self.wcapi.auth.get_sign_key(self.consumer_secret), - "%s&" % self.consumer_secret - ) - - self.assertEqual( - self.wcapi.auth.get_sign_key(self.twitter_consumer_secret, self.twitter_token_secret), - self.twitter_signing_key - ) - - def test_flatten_params(self): - self.assertEqual( - UrlUtils.flatten_params(self.twitter_params_raw), - self.twitter_param_string - ) - - def test_sorted_params(self): - # Example given in oauth.net: - oauthnet_example_sorted = [ - ('a', '1'), - ('c', 'hi%%20there'), - ('f', '25'), - ('f', '50'), - ('f', 'a'), - ('z', 'p'), - ('z', 't') - ] - - oauthnet_example = copy(oauthnet_example_sorted) - random.shuffle(oauthnet_example) - - # oauthnet_example_sorted = [ - # ('a', '1'), - # ('c', 'hi%%20there'), - # ('f', '25'), - # ('z', 'p'), - # ] - - self.assertEqual( - UrlUtils.sorted_params(oauthnet_example), - oauthnet_example_sorted - ) - - def test_get_signature_base_string(self): - twitter_param_string = OAuth.get_signature_base_string( - self.twitter_method, - self.twitter_params_raw, - self.twitter_target_url - ) - self.assertEqual( - twitter_param_string, - self.twitter_signature_base_string - ) - - # @unittest.skip("changed order of parms to fit wordpress api") - def test_generate_oauth_signature(self): - - # endpoint_url = UrlUtils.join_components([self.base_url, self.api_name, self.api_ver, self.endpoint]) - # - # params = OrderedDict() - # params["oauth_consumer_key"] = self.consumer_key - # params["oauth_timestamp"] = "1477041328" - # params["oauth_nonce"] = "166182658461433445531477041328" - # params["oauth_signature_method"] = self.signature_method - # params["oauth_version"] = "1.0" - # params["oauth_callback"] = 'localhost:8888/wordpress' - # - # sig = self.wcapi.auth.generate_oauth_signature("POST", params, endpoint_url) - # expected_sig = "517qNKeq/vrLZGj2UH7+q8ILWAg=" - # self.assertEqual(sig, expected_sig) - - # TEST WITH RFC EXAMPLE 1 DATA - - rfc1_request_signature = self.rfc1_api.auth.generate_oauth_signature( - self.rfc1_request_method, - self.rfc1_request_params, - self.rfc1_request_target_url, - '%s&' % self.rfc1_consumer_secret - ) - self.assertEqual( - text_type(rfc1_request_signature), - text_type(self.rfc1_request_signature) - ) - - # TEST WITH RFC EXAMPLE 3 DATA - - # TEST WITH TWITTER DATA - - twitter_signature = self.twitter_api.auth.generate_oauth_signature( - self.twitter_method, - self.twitter_params_raw, - self.twitter_target_url, - self.twitter_signing_key - ) - self.assertEqual(twitter_signature, self.twitter_oauth_signature) - - # TEST WITH LEXEV DATA - - lexev_request_signature = self.lexev_api.auth.generate_oauth_signature( - method=self.lexev_request_method, - params=self.lexev_request_params, - url=self.lexev_request_url - ) - self.assertEqual(lexev_request_signature, self.lexev_request_signature) - - - def test_add_params_sign(self): - endpoint_url = self.wcapi.requester.endpoint_url('products?page=2') - - params = OrderedDict() - params["oauth_consumer_key"] = self.consumer_key - params["oauth_timestamp"] = "1477041328" - params["oauth_nonce"] = "166182658461433445531477041328" - params["oauth_signature_method"] = self.signature_method - params["oauth_version"] = "1.0" - params["oauth_callback"] = 'localhost:8888/wordpress' - - signed_url = self.wcapi.auth.add_params_sign("GET", endpoint_url, params) - - signed_url_params = parse_qsl(urlparse(signed_url).query) - # self.assertEqual('page', signed_url_params[-1][0]) - self.assertIn('page', dict(signed_url_params)) - -class OAuth3LegTestcases(unittest.TestCase): - def setUp(self): - self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - self.api = API( - url="http://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret, - oauth1a_3leg=True, - wp_user='test_user', - wp_pass='test_pass', - callback='http://127.0.0.1/oauth1_callback' - ) - - @urlmatch(path=r'.*wp-json.*') - def woo_api_mock(*args, **kwargs): - """ URL Mock """ - return { - 'status_code': 200, - 'content': b""" - { - "name": "Wordpress", - "description": "Just another WordPress site", - "url": "http://localhost:8888/wordpress", - "home": "http://localhost:8888/wordpress", - "namespaces": [ - "wp/v2", - "oembed/1.0", - "wc/v1" - ], - "authentication": { - "oauth1": { - "request": "http://localhost:8888/wordpress/oauth1/request", - "authorize": "http://localhost:8888/wordpress/oauth1/authorize", - "access": "http://localhost:8888/wordpress/oauth1/access", - "version": "0.1" - } - } - } - """ - } - - @urlmatch(path=r'.*oauth.*') - def woo_authentication_mock(*args, **kwargs): - """ URL Mock """ - return { - 'status_code':200, - 'content': b"""oauth_token=XXXXXXXXXXXX&oauth_token_secret=YYYYYYYYYYYY""" - } - - def test_get_sign_key(self): - oauth_token_secret = "PNW9j1yBki3e7M7EqB5qZxbe9n5tR6bIIefSMQ9M2pdyRI9g" - - key = self.api.auth.get_sign_key(self.consumer_secret, oauth_token_secret) - self.assertEqual( - key, - "%s&%s" % (self.consumer_secret, oauth_token_secret) - ) - self.assertEqual(type(key), type("")) - - def test_auth_discovery(self): - - with HTTMock(self.woo_api_mock): - # call requests - authentication = self.api.auth.authentication - self.assertEquals( - authentication, - { - "oauth1": { - "request": "http://localhost:8888/wordpress/oauth1/request", - "authorize": "http://localhost:8888/wordpress/oauth1/authorize", - "access": "http://localhost:8888/wordpress/oauth1/access", - "version": "0.1" - } - } - ) - - def test_get_request_token(self): - - with HTTMock(self.woo_api_mock): - authentication = self.api.auth.authentication - self.assertTrue(authentication) - - with HTTMock(self.woo_authentication_mock): - request_token, request_token_secret = self.api.auth.get_request_token() - self.assertEquals(request_token, 'XXXXXXXXXXXX') - self.assertEquals(request_token_secret, 'YYYYYYYYYYYY') - - def test_store_access_creds(self): - _, creds_store_path = mkstemp("wp-api-python-test-store-access-creds.json") - api = API( - url="http://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret, - oauth1a_3leg=True, - wp_user='test_user', - wp_pass='test_pass', - callback='http://127.0.0.1/oauth1_callback', - access_token='XXXXXXXXXXXX', - access_token_secret='YYYYYYYYYYYY', - creds_store=creds_store_path - ) - api.auth.store_access_creds() - - with open(creds_store_path) as creds_store_file: - self.assertEqual( - creds_store_file.read(), - '{"access_token": "XXXXXXXXXXXX", "access_token_secret": "YYYYYYYYYYYY"}' - ) - - def test_retrieve_access_creds(self): - _, creds_store_path = mkstemp("wp-api-python-test-store-access-creds.json") - with open(creds_store_path, 'w+') as creds_store_file: - creds_store_file.write('{"access_token": "XXXXXXXXXXXX", "access_token_secret": "YYYYYYYYYYYY"}') - - api = API( - url="http://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret, - oauth1a_3leg=True, - wp_user='test_user', - wp_pass='test_pass', - callback='http://127.0.0.1/oauth1_callback', - creds_store=creds_store_path - ) - - api.auth.retrieve_access_creds() - - self.assertEqual( - api.auth.access_token, - 'XXXXXXXXXXXX' - ) - - self.assertEqual( - api.auth.access_token_secret, - 'YYYYYYYYYYYY' - ) - -class WCApiTestCasesBase(unittest.TestCase): - """ Base class for WC API Test cases """ - def setUp(self): - Auth.force_timestamp = CURRENT_TIMESTAMP - Auth.force_nonce = SHITTY_NONCE - self.api_params = { - 'url':'http://localhost:8083/', - 'api':'wc-api', - 'version':'v3', - 'consumer_key':'ck_659f6994ae88fed68897f9977298b0e19947979a', - 'consumer_secret':'cs_9421d39290f966172fef64ae18784a2dc7b20976', - } - -class WCApiTestCasesLegacy(WCApiTestCasesBase): - """ Tests for WC API V3 """ - def setUp(self): - super(WCApiTestCasesLegacy, self).setUp() - self.api_params['version'] = 'v3' - self.api_params['api'] = 'wc-api' - - - def test_APIGet(self): - wcapi = API(**self.api_params) - response = wcapi.get('products') - # print UrlUtils.beautify_response(response) - self.assertIn(response.status_code, [200,201]) - response_obj = response.json() - self.assertIn('products', response_obj) - self.assertEqual(len(response_obj['products']), 10) - # print "test_APIGet", response_obj - - def test_APIGetWithSimpleQuery(self): - wcapi = API(**self.api_params) - response = wcapi.get('products?page=2') - # print UrlUtils.beautify_response(response) - self.assertIn(response.status_code, [200,201]) - - response_obj = response.json() - self.assertIn('products', response_obj) - self.assertEqual(len(response_obj['products']), 8) - # print "test_ApiGenWithSimpleQuery", response_obj - - def test_APIGetWithComplexQuery(self): - wcapi = API(**self.api_params) - response = wcapi.get('products?page=2&filter%5Blimit%5D=2') - self.assertIn(response.status_code, [200,201]) - response_obj = response.json() - self.assertIn('products', response_obj) - self.assertEqual(len(response_obj['products']), 2) - - response = wcapi.get('products?oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&oauth_nonce=037470f3b08c9232b0888f52cb9d4685b44d8fd1&oauth_signature=wrKfuIjbwi%2BTHynAlTP4AssoPS0%3D&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1481606275&filter%5Blimit%5D=3') - self.assertIn(response.status_code, [200,201]) - response_obj = response.json() - self.assertIn('products', response_obj) - self.assertEqual(len(response_obj['products']), 3) - - def test_APIPutWithSimpleQuery(self): - wcapi = API(**self.api_params) - response = wcapi.get('products') - first_product = (response.json())['products'][0] - original_title = first_product['title'] - product_id = first_product['id'] - - nonce = b"%f" % (random.random()) - response = wcapi.put('products/%s?filter%%5Blimit%%5D=5' % (product_id), {"product":{"title":text_type(nonce)}}) - request_params = UrlUtils.get_query_dict_singular(response.request.url) - response_obj = response.json() - self.assertEqual(response_obj['product']['title'], text_type(nonce)) - self.assertEqual(request_params['filter[limit]'], text_type(5)) - - wcapi.put('products/%s' % (product_id), {"product":{"title":original_title}}) - - -class WCApiTestCases(WCApiTestCasesBase): - oauth1a_3leg = False - """ Tests for New wp-json/wc/v2 API """ - def setUp(self): - super(WCApiTestCases, self).setUp() - self.api_params['version'] = 'wc/v2' - self.api_params['api'] = 'wp-json' - self.api_params['callback'] = 'http://127.0.0.1/oauth1_callback' - if self.oauth1a_3leg: - self.api_params['oauth1a_3leg'] = True - - # @debug_on() - def test_APIGet(self): - wcapi = API(**self.api_params) - per_page = 10 - response = wcapi.get('products?per_page=%d' % per_page) - self.assertIn(response.status_code, [200,201]) - response_obj = response.json() - self.assertEqual(len(response_obj), per_page) - - - def test_APIPutWithSimpleQuery(self): - wcapi = API(**self.api_params) - response = wcapi.get('products') - first_product = (response.json())[0] - # from pprint import pformat - # print "first product %s" % pformat(response.json()) - original_title = first_product['name'] - product_id = first_product['id'] - - nonce = b"%f" % (random.random()) - response = wcapi.put('products/%s?page=2&per_page=5' % (product_id), {"name":text_type(nonce)}) - request_params = UrlUtils.get_query_dict_singular(response.request.url) - response_obj = response.json() - self.assertEqual(response_obj['name'], text_type(nonce)) - self.assertEqual(request_params['per_page'], '5') - - wcapi.put('products/%s' % (product_id), {"name":original_title}) - - def test_APIPostWithLatin1Query(self): - wcapi = API(**self.api_params) - nonce = u"%f\u00ae" % random.random() - - data = { - "name": nonce.encode('latin-1'), - "type": "simple", - } - - if six.PY2: - response = wcapi.post('products', data) - response_obj = response.json() - product_id = response_obj.get('id') - self.assertEqual(response_obj.get('name'), nonce) - wcapi.delete('products/%s' % product_id) - return - with self.assertRaises(TypeError): - response = wcapi.post('products', data) - - def test_APIPostWithUTF8Query(self): - wcapi = API(**self.api_params) - nonce = u"%f\u00ae" % random.random() - - data = { - "name": nonce.encode('utf8'), - "type": "simple", - } - - if six.PY2: - response = wcapi.post('products', data) - response_obj = response.json() - product_id = response_obj.get('id') - self.assertEqual(response_obj.get('name'), nonce) - wcapi.delete('products/%s' % product_id) - return - with self.assertRaises(TypeError): - response = wcapi.post('products', data) - - - def test_APIPostWithUnicodeQuery(self): - wcapi = API(**self.api_params) - nonce = u"%f\u00ae" % random.random() - - data = { - "name": nonce, - "type": "simple", - } - - response = wcapi.post('products', data) - response_obj = response.json() - product_id = response_obj.get('id') - self.assertEqual(response_obj.get('name'), nonce) - wcapi.delete('products/%s' % product_id) - -@unittest.skip("these simply don't work for some reason") -class WCApiTestCases3Leg(WCApiTestCases): - """ Tests for New wp-json/wc/v2 API with 3-leg """ - oauth1a_3leg = True - -@unittest.skipIf(platform.uname()[1] != "Derwents-MacBook-Pro.local", "should only work on my machine") -class WPAPITestCasesBase(unittest.TestCase): - def setUp(self): - Auth.force_timestamp = CURRENT_TIMESTAMP - Auth.force_nonce = SHITTY_NONCE - self.creds_store = '~/wc-api-creds-test.json' - self.api_params = { - 'url':'http://derwent-mac.ddns.me:18080/wptest/', - 'api':'wp-json', - 'version':'wp/v2', - 'consumer_key':'tYG1tAoqjBEM', - 'consumer_secret':'s91fvylVrqChwzzDbEJHEWyySYtAmlIsqqYdjka1KyVDdAyB', - 'callback':'http://127.0.0.1/oauth1_callback', - 'wp_user':'wptest', - 'wp_pass':'gZ*gZk#v0t5$j#NQ@9', - 'oauth1a_3leg':True, - } - - # @debug_on() - def test_APIGet(self): - self.wpapi = API(**self.api_params) - response = self.wpapi.get('users/me') - self.assertIn(response.status_code, [200,201]) - response_obj = response.json() - self.assertEqual(response_obj['name'], self.api_params['wp_user']) - -class WPAPITestCasesBasic(WPAPITestCasesBase): - def setUp(self): - super(WPAPITestCasesBasic, self).setUp() - self.api_params.update({ - 'user_auth': True, - 'basic_auth': True, - 'query_string_auth': False, - }) - self.wpapi = API(**self.api_params) - -# class WPAPITestCasesBasicV1(WPAPITestCasesBase): -# def setUp(self): -# super(WPAPITestCasesBasicV1, self).setUp() -# self.api_params.update({ -# 'user_auth': True, -# 'basic_auth': True, -# 'query_string_auth': False, -# 'version': 'wp/v1' -# }) -# self.wpapi = API(**self.api_params) -# -# def test_get_endpoint_url(self): -# self.api_params.update({ -# 'version': '' -# }) -# self.wpapi = API(**self.api_params) -# endpoint_url = self.wpapi.requester.endpoint_url('') -# print endpoint_url -# -# def test_APIGetWithSimpleQuery(self): -# response = self.wpapi.get('posts') -# self.assertIn(response.status_code, [200,201]) - - -class WPAPITestCases3leg(WPAPITestCasesBase): - def setUp(self): - super(WPAPITestCases3leg, self).setUp() - self.api_params.update({ - 'creds_store': self.creds_store, - }) - self.wpapi = API(**self.api_params) - self.wpapi.auth.clear_stored_creds() - - def test_APIGetWithSimpleQuery(self): - response = self.wpapi.get('media?page=2&per_page=2') - # print UrlUtils.beautify_response(response) - self.assertIn(response.status_code, [200,201]) - - response_obj = response.json() - self.assertEqual(len(response_obj), 2) - # print "test_ApiGenWithSimpleQuery", response_obj - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..3fe6c03 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,36 @@ +""" +Test case module. +""" + +from time import time +import sys +import logging +import pdb +import functools +import traceback +import copy + +CURRENT_TIMESTAMP = int(time()) +SHITTY_NONCE = "" +DEFAULT_ENCODING = sys.getdefaultencoding() + + +def debug_on(*exceptions): + if not exceptions: + exceptions = (AssertionError, ) + + def decorator(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + prev_root = copy(logging.root) + try: + logging.basicConfig(level=logging.DEBUG) + return f(*args, **kwargs) + except exceptions: + info = sys.exc_info() + traceback.print_exception(*info) + pdb.post_mortem(info[2]) + finally: + logging.root = prev_root + return wrapper + return decorator diff --git a/tests/data/test.jpg b/tests/data/test.jpg new file mode 100644 index 0000000..ea01a22 Binary files /dev/null and b/tests/data/test.jpg differ diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..e9c39f8 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,535 @@ +""" API Tests """ +from __future__ import unicode_literals + +import os +import random +import unittest + +import requests + +import six +import wordpress +from httmock import HTTMock, all_requests +from six import text_type +from wordpress import __default_api__, __default_api_version__, auth +from wordpress.api import API +from wordpress.auth import Auth +from wordpress.helpers import StrUtils, UrlUtils + +from . import CURRENT_TIMESTAMP, SHITTY_NONCE + + +class WordpressTestCase(unittest.TestCase): + """Test case for the mocked client methods.""" + + def setUp(self): + self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.api = wordpress.API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + + def test_api(self): + """ Test default API """ + api = wordpress.API( + url="https://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + + self.assertEqual(api.namespace, __default_api__) + + def test_version(self): + """ Test default version """ + api = wordpress.API( + url="https://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + + self.assertEqual(api.version, __default_api_version__) + + def test_non_ssl(self): + """ Test non-ssl """ + api = wordpress.API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + self.assertFalse(api.is_ssl) + + def test_with_ssl(self): + """ Test non-ssl """ + api = wordpress.API( + url="https://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + self.assertTrue(api.is_ssl, True) + + def test_with_timeout(self): + """ Test non-ssl """ + api = wordpress.API( + url="https://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + timeout=10, + ) + self.assertEqual(api.timeout, 10) + + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = api.get("products").status_code + self.assertEqual(status, 200) + + def test_get(self): + """ Test GET requests """ + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = self.api.get("products").status_code + self.assertEqual(status, 200) + + def test_post(self): + """ Test POST requests """ + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 201, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = self.api.post("products", {}).status_code + self.assertEqual(status, 201) + + def test_put(self): + """ Test PUT requests """ + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = self.api.put("products", {}).status_code + self.assertEqual(status, 200) + + def test_delete(self): + """ Test DELETE requests """ + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = self.api.delete("products").status_code + self.assertEqual(status, 200) + + # @unittest.skip("going by RRC 5849 sorting instead") + def test_oauth_sorted_params(self): + """ Test order of parameters for OAuth signature """ + def check_sorted(keys, expected): + params = auth.OrderedDict() + for key in keys: + params[key] = '' + + params = UrlUtils.sorted_params(params) + ordered = [key for key, value in params] + self.assertEqual(ordered, expected) + + check_sorted(['a', 'b'], ['a', 'b']) + check_sorted(['b', 'a'], ['a', 'b']) + check_sorted(['a', 'b[a]', 'b[b]', 'b[c]', 'c'], + ['a', 'b[a]', 'b[b]', 'b[c]', 'c']) + check_sorted(['a', 'b[c]', 'b[a]', 'b[b]', 'c'], + ['a', 'b[c]', 'b[a]', 'b[b]', 'c']) + check_sorted(['d', 'b[c]', 'b[a]', 'b[b]', 'c'], + ['b[c]', 'b[a]', 'b[b]', 'c', 'd']) + check_sorted(['a1', 'b[c]', 'b[a]', 'b[b]', 'a2'], + ['a1', 'a2', 'b[c]', 'b[a]', 'b[b]']) + + +class WCApiTestCasesBase(unittest.TestCase): + """ Base class for WC API Test cases """ + + def setUp(self): + Auth.force_timestamp = CURRENT_TIMESTAMP + Auth.force_nonce = SHITTY_NONCE + self.api_params = { + 'url': 'http://localhost:8083/', + 'api': 'wc-api', + 'version': 'v3', + 'consumer_key': 'ck_659f6994ae88fed68897f9977298b0e19947979a', + 'consumer_secret': 'cs_9421d39290f966172fef64ae18784a2dc7b20976', + } + + +class WCApiTestCasesLegacy(WCApiTestCasesBase): + """ Tests for WC API V3 """ + + def setUp(self): + super(WCApiTestCasesLegacy, self).setUp() + self.api_params['version'] = 'v3' + self.api_params['api'] = 'wc-api' + + def test_APIGet(self): + wcapi = API(**self.api_params) + response = wcapi.get('products') + # print UrlUtils.beautify_response(response) + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertIn('products', response_obj) + self.assertEqual(len(response_obj['products']), 10) + # print "test_APIGet", response_obj + + def test_APIGetWithSimpleQuery(self): + wcapi = API(**self.api_params) + response = wcapi.get('products?page=2') + # print UrlUtils.beautify_response(response) + self.assertIn(response.status_code, [200, 201]) + + response_obj = response.json() + self.assertIn('products', response_obj) + self.assertEqual(len(response_obj['products']), 8) + # print "test_ApiGenWithSimpleQuery", response_obj + + def test_APIGetWithComplexQuery(self): + wcapi = API(**self.api_params) + response = wcapi.get('products?page=2&filter%5Blimit%5D=2') + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertIn('products', response_obj) + self.assertEqual(len(response_obj['products']), 2) + + response = wcapi.get( + 'products?' + 'oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&' + 'oauth_nonce=037470f3b08c9232b0888f52cb9d4685b44d8fd1&' + 'oauth_signature=wrKfuIjbwi%2BTHynAlTP4AssoPS0%3D&' + 'oauth_signature_method=HMAC-SHA1&' + 'oauth_timestamp=1481606275&' + 'filter%5Blimit%5D=3' + ) + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertIn('products', response_obj) + self.assertEqual(len(response_obj['products']), 3) + + def test_APIPutWithSimpleQuery(self): + wcapi = API(**self.api_params) + response = wcapi.get('products') + first_product = (response.json())['products'][0] + original_title = first_product['title'] + product_id = first_product['id'] + + nonce = b"%f" % (random.random()) + response = wcapi.put('products/%s?filter%%5Blimit%%5D=5' % + (product_id), + {"product": {"title": text_type(nonce)}}) + request_params = UrlUtils.get_query_dict_singular(response.request.url) + response_obj = response.json() + self.assertEqual(response_obj['product']['title'], text_type(nonce)) + self.assertEqual(request_params['filter[limit]'], text_type(5)) + + wcapi.put('products/%s' % (product_id), + {"product": {"title": original_title}}) + + +class WCApiTestCases(WCApiTestCasesBase): + oauth1a_3leg = False + """ Tests for New wp-json/wc/v2 API """ + + def setUp(self): + super(WCApiTestCases, self).setUp() + self.api_params['version'] = 'wc/v2' + self.api_params['api'] = 'wp-json' + self.api_params['callback'] = 'http://127.0.0.1/oauth1_callback' + if self.oauth1a_3leg: + self.api_params['oauth1a_3leg'] = True + + # @debug_on() + def test_APIGet(self): + wcapi = API(**self.api_params) + per_page = 10 + response = wcapi.get('products?per_page=%d' % per_page) + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertEqual(len(response_obj), per_page) + + def test_APIPutWithSimpleQuery(self): + wcapi = API(**self.api_params) + response = wcapi.get('products') + first_product = (response.json())[0] + # from pprint import pformat + # print "first product %s" % pformat(response.json()) + original_title = first_product['name'] + product_id = first_product['id'] + + nonce = b"%f" % (random.random()) + response = wcapi.put('products/%s?page=2&per_page=5' % + (product_id), {"name": text_type(nonce)}) + request_params = UrlUtils.get_query_dict_singular(response.request.url) + response_obj = response.json() + self.assertEqual(response_obj['name'], text_type(nonce)) + self.assertEqual(request_params['per_page'], '5') + + wcapi.put('products/%s' % (product_id), {"name": original_title}) + + @unittest.skipIf(six.PY2, "non-utf8 bytes not supported in python2") + def test_APIPostWithBytesQuery(self): + wcapi = API(**self.api_params) + nonce = b"%f\xff" % random.random() + + data = { + "name": nonce, + "type": "simple", + } + + response = wcapi.post('products', data) + response_obj = response.json() + product_id = response_obj.get('id') + + expected = StrUtils.to_text(nonce, encoding='ascii', errors='replace') + + self.assertEqual( + response_obj.get('name'), + expected, + ) + wcapi.delete('products/%s' % product_id) + + @unittest.skipIf(six.PY2, "non-utf8 bytes not supported in python2") + def test_APIPostWithLatin1Query(self): + wcapi = API(**self.api_params) + nonce = "%f\u00ae" % random.random() + + data = { + "name": nonce.encode('latin-1'), + "type": "simple", + } + + response = wcapi.post('products', data) + response_obj = response.json() + product_id = response_obj.get('id') + + expected = StrUtils.to_text( + StrUtils.to_binary(nonce, encoding='latin-1'), + encoding='ascii', errors='replace' + ) + + self.assertEqual( + response_obj.get('name'), + expected + ) + wcapi.delete('products/%s' % product_id) + + def test_APIPostWithUTF8Query(self): + wcapi = API(**self.api_params) + nonce = "%f\u00ae" % random.random() + + data = { + "name": nonce.encode('utf8'), + "type": "simple", + } + + response = wcapi.post('products', data) + response_obj = response.json() + product_id = response_obj.get('id') + self.assertEqual(response_obj.get('name'), nonce) + wcapi.delete('products/%s' % product_id) + + def test_APIPostWithUnicodeQuery(self): + wcapi = API(**self.api_params) + nonce = "%f\u00ae" % random.random() + + data = { + "name": nonce, + "type": "simple", + } + + response = wcapi.post('products', data) + response_obj = response.json() + product_id = response_obj.get('id') + self.assertEqual(response_obj.get('name'), nonce) + wcapi.delete('products/%s' % product_id) + + +@unittest.skip("these simply don't work for some reason") +class WCApiTestCases3Leg(WCApiTestCases): + """ Tests for New wp-json/wc/v2 API with 3-leg """ + oauth1a_3leg = True + + +class WPAPITestCasesBase(unittest.TestCase): + api_params = { + 'url': 'http://localhost:8083/', + 'api': 'wp-json', + 'version': 'wp/v2', + 'consumer_key': 'tYG1tAoqjBEM', + 'consumer_secret': 's91fvylVrqChwzzDbEJHEWyySYtAmlIsqqYdjka1KyVDdAyB', + 'callback': 'http://127.0.0.1/oauth1_callback', + 'wp_user': 'admin', + 'wp_pass': 'admin', + 'oauth1a_3leg': True, + } + + def setUp(self): + Auth.force_timestamp = CURRENT_TIMESTAMP + Auth.force_nonce = SHITTY_NONCE + self.wpapi = API(**self.api_params) + + # @debug_on() + def test_APIGet(self): + response = self.wpapi.get('users/me') + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertEqual(response_obj['name'], self.api_params['wp_user']) + + def test_APIGetWithSimpleQuery(self): + response = self.wpapi.get('pages?page=2&per_page=2') + self.assertIn(response.status_code, [200, 201]) + + response_obj = response.json() + self.assertEqual(len(response_obj), 2) + + def test_APIPostData(self): + nonce = "%f\u00ae" % random.random() + + content = "api test post" + + data = { + "title": nonce, + "content": content, + "excerpt": content + } + + response = self.wpapi.post('posts', data) + response_obj = response.json() + post_id = response_obj.get('id') + self.assertEqual(response_obj.get('title').get('raw'), nonce) + self.wpapi.delete('posts/%s' % post_id) + + def test_APIPostBadData(self): + """ + No excerpt so should fail to be created. + """ + nonce = "%f\u00ae" % random.random() + + data = { + 'a': nonce + } + + with self.assertRaises(UserWarning): + self.wpapi.post('posts', data) + + def test_APIPostBadDataHandleBadStatus(self): + """ + Test handling explicitly a bad status code for a request. + """ + nonce = "%f\u00ae" % random.random() + + data = { + 'a': nonce + } + + response = self.wpapi.post('posts', data, handle_status_codes=[400]) + self.assertEqual(response.status_code, 400) + + # If we don't specify a correct status code to handle we should + # still expect an exception + with self.assertRaises(UserWarning): + self.wpapi.post('posts', data, handle_status_codes=[404]) + + def test_APIPostMedia(self): + img_path = 'tests/data/test.jpg' + with open(img_path, 'rb') as test_file: + img_data = test_file.read() + img_name = os.path.basename(img_path) + + res = self.wpapi.post( + 'media', + data=img_data, + headers={ + 'Content-Type': 'image/jpg', + 'Content-Disposition' : 'attachment; filename=%s'% img_name + } + ) + + self.assertEqual(res.status_code, 201) + res_obj = res.json() + created_id = res_obj.get('id') + self.assertTrue(created_id) + uploaded_res = requests.get(res_obj.get('source_url')) + + # check for bug where image bytestream was quoted + self.assertNotEqual(StrUtils.to_binary(uploaded_res.text[0]), b'"') + + self.wpapi.delete('media/%s?force=True' % created_id) + + def test_APIPostComplexContent(self): + data = { + 'content': "this content has links" + } + res = self.wpapi.post('posts', data) + + self.assertEqual(res.status_code, 201) + res_obj = res.json() + res_id= res_obj.get('id') + self.assertTrue(res_id) + print(res_obj) + res_content = res_obj.get('content').get('raw') + self.assertEqual(data.get('content'), res_content) + + # def test_APIPostMediaBadCreds(self): + # """ + # TODO: make sure the warning is "ensure login and basic auth is installed" + # """ + # img_path = 'tests/data/test.jpg' + # with open(img_path, 'rb') as test_file: + # img_data = test_file.read() + # img_name = os.path.basename(img_path) + # res = self.wpapi.post( + # 'media', + # data=img_data, + # headers={ + # 'Content-Type': 'image/jpg', + # 'Content-Disposition' : 'attachment; filename=%s'% img_name + # } + # ) + + +class WPAPITestCasesBasic(WPAPITestCasesBase): + api_params = dict(**WPAPITestCasesBase.api_params) + api_params.update({ + 'user_auth': True, + 'basic_auth': True, + 'query_string_auth': False, + }) + + +class WPAPITestCases3leg(WPAPITestCasesBase): + + api_params = dict(**WPAPITestCasesBase.api_params) + api_params.update({ + 'creds_store': '~/wc-api-creds-test.json', + }) + + def setUp(self): + super(WPAPITestCases3leg, self).setUp() + self.wpapi.auth.clear_stored_creds() diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..42893ce --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,478 @@ +""" API Tests """ +from __future__ import unicode_literals + +import random +import unittest +from collections import OrderedDict +from copy import copy +from tempfile import mkstemp + +from httmock import HTTMock, urlmatch +from six import text_type +from six.moves.urllib.parse import parse_qsl, urlparse +from wordpress.api import API +from wordpress.auth import OAuth +from wordpress.helpers import StrUtils, UrlUtils + + +class BasicAuthTestcases(unittest.TestCase): + def setUp(self): + self.base_url = "http://localhost:8888/wp-api/" + self.api_name = 'wc-api' + self.api_ver = 'v3' + self.endpoint = 'products/26' + self.signature_method = "HMAC-SHA1" + + self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.api_params = dict( + url=self.base_url, + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + basic_auth=True, + api=self.api_name, + version=self.api_ver, + query_string_auth=False, + ) + + def test_endpoint_url(self): + api = API( + **self.api_params + ) + endpoint_url = api.requester.endpoint_url(self.endpoint) + endpoint_url = api.auth.get_auth_url(endpoint_url, 'GET') + self.assertEqual( + endpoint_url, + UrlUtils.join_components([ + self.base_url, self.api_name, self.api_ver, self.endpoint + ]) + ) + + def test_query_string_endpoint_url(self): + query_string_api_params = dict(**self.api_params) + query_string_api_params.update(dict(query_string_auth=True)) + api = API( + **query_string_api_params + ) + endpoint_url = api.requester.endpoint_url(self.endpoint) + endpoint_url = api.auth.get_auth_url(endpoint_url, 'GET') + expected_endpoint_url = '%s?consumer_key=%s&consumer_secret=%s' % ( + self.endpoint, self.consumer_key, self.consumer_secret) + expected_endpoint_url = UrlUtils.join_components( + [self.base_url, self.api_name, self.api_ver, expected_endpoint_url] + ) + self.assertEqual( + endpoint_url, + expected_endpoint_url + ) + endpoint_url = api.requester.endpoint_url(self.endpoint) + endpoint_url = api.auth.get_auth_url(endpoint_url, 'GET') + + +class OAuthTestcases(unittest.TestCase): + + def setUp(self): + self.base_url = "http://localhost:8888/wordpress/" + self.api_name = 'wc-api' + self.api_ver = 'v3' + self.endpoint = 'products/99' + self.signature_method = "HMAC-SHA1" + self.consumer_key = "ck_681c2be361e415519dce4b65ee981682cda78bc6" + self.consumer_secret = "cs_b11f652c39a0afd3752fc7bb0c56d60d58da5877" + + self.wcapi = API( + url=self.base_url, + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + api=self.api_name, + version=self.api_ver, + signature_method=self.signature_method + ) + + self.rfc1_api_url = 'https://photos.example.net/' + self.rfc1_consumer_key = 'dpf43f3p2l4k3l03' + self.rfc1_consumer_secret = 'kd94hf93k423kf44' + self.rfc1_oauth_token = 'hh5s93j4hdidpola' + self.rfc1_signature_method = 'HMAC-SHA1' + self.rfc1_callback = 'http://printer.example.com/ready' + self.rfc1_api = API( + url=self.rfc1_api_url, + consumer_key=self.rfc1_consumer_key, + consumer_secret=self.rfc1_consumer_secret, + api='', + version='', + callback=self.rfc1_callback, + wp_user='', + wp_pass='', + oauth1a_3leg=True + ) + self.rfc1_request_method = 'POST' + self.rfc1_request_target_url = 'https://photos.example.net/initiate' + self.rfc1_request_timestamp = '137131200' + self.rfc1_request_nonce = 'wIjqoS' + self.rfc1_request_params = [ + ('oauth_consumer_key', self.rfc1_consumer_key), + ('oauth_signature_method', self.rfc1_signature_method), + ('oauth_timestamp', self.rfc1_request_timestamp), + ('oauth_nonce', self.rfc1_request_nonce), + ('oauth_callback', self.rfc1_callback), + ] + self.rfc1_request_signature = b'74KNZJeDHnMBp0EMJ9ZHt/XKycU=' + + self.twitter_api_url = "https://api.twitter.com/" + self.twitter_consumer_secret = \ + "kAcSOqF21Fu85e7zjz7ZN2U4ZRhfV3WpwPAoE3Z7kBw" + self.twitter_consumer_key = "xvz1evFS4wEEPTGEFPHBog" + self.twitter_signature_method = "HMAC-SHA1" + self.twitter_api = API( + url=self.twitter_api_url, + consumer_key=self.twitter_consumer_key, + consumer_secret=self.twitter_consumer_secret, + api='', + version='1', + signature_method=self.twitter_signature_method, + ) + + self.twitter_method = "POST" + self.twitter_target_url = ( + "https://api.twitter.com/1/statuses/update.json?" + "include_entities=true" + ) + self.twitter_params_raw = [ + ("status", "Hello Ladies + Gentlemen, a signed OAuth request!"), + ("include_entities", "true"), + ("oauth_consumer_key", self.twitter_consumer_key), + ("oauth_nonce", "kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg"), + ("oauth_signature_method", self.twitter_signature_method), + ("oauth_timestamp", "1318622958"), + ("oauth_token", + "370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb"), + ("oauth_version", "1.0"), + ] + self.twitter_param_string = ( + r"include_entities=true&" + r"oauth_consumer_key=xvz1evFS4wEEPTGEFPHBog&" + r"oauth_nonce=kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg&" + r"oauth_signature_method=HMAC-SHA1&" + r"oauth_timestamp=1318622958&" + r"oauth_token=370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb&" + r"oauth_version=1.0&" + r"status=Hello%20Ladies%20%2B%20Gentlemen%2C%20a%20" + r"signed%20OAuth%20request%21" + ) + self.twitter_signature_base_string = ( + r"POST&" + r"https%3A%2F%2Fapi.twitter.com%2F1%2Fstatuses%2Fupdate.json&" + r"include_entities%3Dtrue%26" + r"oauth_consumer_key%3Dxvz1evFS4wEEPTGEFPHBog%26" + r"oauth_nonce%3DkYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg%26" + r"oauth_signature_method%3DHMAC-SHA1%26" + r"oauth_timestamp%3D1318622958%26" + r"oauth_token%3D370773112-" + r"GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb%26" + r"oauth_version%3D1.0%26" + r"status%3DHello%2520Ladies%2520%252B%2520Gentlemen%252C%2520" + r"a%2520signed%2520OAuth%2520request%2521" + ) + self.twitter_token_secret = 'LswwdoUaIvS8ltyTt5jkRh4J50vUPVVHtR2YPi5kE' + self.twitter_signing_key = ( + 'kAcSOqF21Fu85e7zjz7ZN2U4ZRhfV3WpwPAoE3Z7kBw&' + 'LswwdoUaIvS8ltyTt5jkRh4J50vUPVVHtR2YPi5kE' + ) + self.twitter_oauth_signature = b'tnnArxj06cWHq44gCs1OSKk/jLY=' + + self.lexev_consumer_key = 'your_app_key' + self.lexev_consumer_secret = 'your_app_secret' + self.lexev_callback = 'http://127.0.0.1/oauth1_callback' + self.lexev_signature_method = 'HMAC-SHA1' + self.lexev_version = '1.0' + self.lexev_api = API( + url='https://bitbucket.org/', + api='api', + version='1.0', + consumer_key=self.lexev_consumer_key, + consumer_secret=self.lexev_consumer_secret, + signature_method=self.lexev_signature_method, + callback=self.lexev_callback, + wp_user='', + wp_pass='', + oauth1a_3leg=True + ) + self.lexev_request_method = 'POST' + self.lexev_request_url = \ + 'https://bitbucket.org/api/1.0/oauth/request_token' + self.lexev_request_nonce = '27718007815082439851427366369' + self.lexev_request_timestamp = '1427366369' + self.lexev_request_params = [ + ('oauth_callback', self.lexev_callback), + ('oauth_consumer_key', self.lexev_consumer_key), + ('oauth_nonce', self.lexev_request_nonce), + ('oauth_signature_method', self.lexev_signature_method), + ('oauth_timestamp', self.lexev_request_timestamp), + ('oauth_version', self.lexev_version), + ] + self.lexev_request_signature = b"iPdHNIu4NGOjuXZ+YCdPWaRwvJY=" + self.lexev_resource_url = ( + 'https://api.bitbucket.org/1.0/repositories/st4lk/' + 'django-articles-transmeta/branches' + ) + + def test_get_sign_key(self): + self.assertEqual( + StrUtils.to_binary( + self.wcapi.auth.get_sign_key(self.consumer_secret)), + StrUtils.to_binary("%s&" % self.consumer_secret) + ) + + self.assertEqual( + StrUtils.to_binary(self.wcapi.auth.get_sign_key( + self.twitter_consumer_secret, self.twitter_token_secret)), + StrUtils.to_binary(self.twitter_signing_key) + ) + + def test_flatten_params(self): + self.assertEqual( + StrUtils.to_binary(UrlUtils.flatten_params( + self.twitter_params_raw)), + StrUtils.to_binary(self.twitter_param_string) + ) + + def test_sorted_params(self): + # Example given in oauth.net: + oauthnet_example_sorted = [ + ('a', '1'), + ('c', 'hi%%20there'), + ('f', '25'), + ('f', '50'), + ('f', 'a'), + ('z', 'p'), + ('z', 't') + ] + + oauthnet_example = copy(oauthnet_example_sorted) + random.shuffle(oauthnet_example) + + self.assertEqual( + UrlUtils.sorted_params(oauthnet_example), + oauthnet_example_sorted + ) + + def test_get_signature_base_string(self): + twitter_param_string = OAuth.get_signature_base_string( + self.twitter_method, + self.twitter_params_raw, + self.twitter_target_url + ) + self.assertEqual( + twitter_param_string, + self.twitter_signature_base_string + ) + + def test_generate_oauth_signature(self): + + rfc1_request_signature = self.rfc1_api.auth.generate_oauth_signature( + self.rfc1_request_method, + self.rfc1_request_params, + self.rfc1_request_target_url, + '%s&' % self.rfc1_consumer_secret + ) + self.assertEqual( + text_type(rfc1_request_signature), + text_type(self.rfc1_request_signature) + ) + + # TEST WITH RFC EXAMPLE 3 DATA + + # TEST WITH TWITTER DATA + + twitter_signature = self.twitter_api.auth.generate_oauth_signature( + self.twitter_method, + self.twitter_params_raw, + self.twitter_target_url, + self.twitter_signing_key + ) + self.assertEqual(twitter_signature, self.twitter_oauth_signature) + + # TEST WITH LEXEV DATA + + lexev_request_signature = self.lexev_api.auth.generate_oauth_signature( + method=self.lexev_request_method, + params=self.lexev_request_params, + url=self.lexev_request_url + ) + self.assertEqual(lexev_request_signature, self.lexev_request_signature) + + def test_add_params_sign(self): + endpoint_url = self.wcapi.requester.endpoint_url('products?page=2') + + params = OrderedDict() + params["oauth_consumer_key"] = self.consumer_key + params["oauth_timestamp"] = "1477041328" + params["oauth_nonce"] = "166182658461433445531477041328" + params["oauth_signature_method"] = self.signature_method + params["oauth_version"] = "1.0" + params["oauth_callback"] = 'localhost:8888/wordpress' + + signed_url = self.wcapi.auth.add_params_sign( + "GET", endpoint_url, params) + + signed_url_params = parse_qsl(urlparse(signed_url).query) + # self.assertEqual('page', signed_url_params[-1][0]) + self.assertIn('page', dict(signed_url_params)) + + +class OAuth3LegTestcases(unittest.TestCase): + def setUp(self): + self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.api = API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + oauth1a_3leg=True, + wp_user='test_user', + wp_pass='test_pass', + callback='http://127.0.0.1/oauth1_callback' + ) + + @urlmatch(path=r'.*wp-json.*') + def woo_api_mock(*args, **kwargs): + """ URL Mock """ + return { + 'status_code': 200, + 'content': b""" + { + "name": "Wordpress", + "description": "Just another WordPress site", + "url": "http://localhost:8888/wordpress", + "home": "http://localhost:8888/wordpress", + "namespaces": [ + "wp/v2", + "oembed/1.0", + "wc/v1" + ], + "authentication": { + "oauth1": { + "request": + "http://localhost:8888/wordpress/oauth1/request", + "authorize": + "http://localhost:8888/wordpress/oauth1/authorize", + "access": + "http://localhost:8888/wordpress/oauth1/access", + "version": "0.1" + } + } + } + """ + } + + @urlmatch(path=r'.*oauth.*') + def woo_authentication_mock(*args, **kwargs): + """ URL Mock """ + return { + 'status_code': 200, + 'content': + b"""oauth_token=XXXXXXXXXXXX&oauth_token_secret=YYYYYYYYYYYY""" + } + + def test_get_sign_key(self): + oauth_token_secret = "PNW9j1yBki3e7M7EqB5qZxbe9n5tR6bIIefSMQ9M2pdyRI9g" + + key = self.api.auth.get_sign_key( + self.consumer_secret, oauth_token_secret) + self.assertEqual( + StrUtils.to_binary(key), + StrUtils.to_binary("%s&%s" % + (self.consumer_secret, oauth_token_secret)) + ) + + def test_auth_discovery(self): + + with HTTMock(self.woo_api_mock): + # call requests + authentication = self.api.auth.authentication + self.assertEquals( + authentication, + { + "oauth1": { + "request": + "http://localhost:8888/wordpress/oauth1/request", + "authorize": + "http://localhost:8888/wordpress/oauth1/authorize", + "access": + "http://localhost:8888/wordpress/oauth1/access", + "version": "0.1" + } + } + ) + + def test_get_request_token(self): + + with HTTMock(self.woo_api_mock): + authentication = self.api.auth.authentication + self.assertTrue(authentication) + + with HTTMock(self.woo_authentication_mock): + request_token, request_token_secret = \ + self.api.auth.get_request_token() + self.assertEquals(request_token, 'XXXXXXXXXXXX') + self.assertEquals(request_token_secret, 'YYYYYYYYYYYY') + + def test_store_access_creds(self): + _, creds_store_path = mkstemp( + "wp-api-python-test-store-access-creds.json") + api = API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + oauth1a_3leg=True, + wp_user='test_user', + wp_pass='test_pass', + callback='http://127.0.0.1/oauth1_callback', + access_token='XXXXXXXXXXXX', + access_token_secret='YYYYYYYYYYYY', + creds_store=creds_store_path + ) + api.auth.store_access_creds() + + with open(creds_store_path) as creds_store_file: + self.assertEqual( + creds_store_file.read(), + ('{"access_token": "XXXXXXXXXXXX", ' + '"access_token_secret": "YYYYYYYYYYYY"}') + ) + + def test_retrieve_access_creds(self): + _, creds_store_path = mkstemp( + "wp-api-python-test-store-access-creds.json") + with open(creds_store_path, 'w+') as creds_store_file: + creds_store_file.write( + ('{"access_token": "XXXXXXXXXXXX", ' + '"access_token_secret": "YYYYYYYYYYYY"}')) + + api = API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + oauth1a_3leg=True, + wp_user='test_user', + wp_pass='test_pass', + callback='http://127.0.0.1/oauth1_callback', + creds_store=creds_store_path + ) + + api.auth.retrieve_access_creds() + + self.assertEqual( + api.auth.access_token, + 'XXXXXXXXXXXX' + ) + + self.assertEqual( + api.auth.access_token_secret, + 'YYYYYYYYYYYY' + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..da07de8 --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,188 @@ +""" API Tests """ +from __future__ import unicode_literals + +import unittest + +from six import text_type +from wordpress.helpers import SeqUtils, StrUtils, UrlUtils + + +class HelperTestcase(unittest.TestCase): + def setUp(self): + self.test_url = ( + "http://ich.local:8888/woocommerce/wc-api/v3/products?" + "filter%5Blimit%5D=2&" + "oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&" + "oauth_nonce=c4f2920b0213c43f2e8d3d3333168ec4a22222d1&" + "oauth_signature=3ibOjMuhj6JGnI43BQZGniigHh8%3D&" + "oauth_signature_method=HMAC-SHA1&" + "oauth_timestamp=1481601370&page=2" + ) + + def test_url_is_ssl(self): + self.assertTrue(UrlUtils.is_ssl("https://woo.test:8888")) + self.assertFalse(UrlUtils.is_ssl("http://woo.test:8888")) + + def test_url_substitute_query(self): + self.assertEqual( + UrlUtils.substitute_query( + "https://woo.test:8888/sdf?param=value", "newparam=newvalue"), + "https://woo.test:8888/sdf?newparam=newvalue" + ) + self.assertEqual( + UrlUtils.substitute_query("https://woo.test:8888/sdf?param=value"), + "https://woo.test:8888/sdf" + ) + self.assertEqual( + UrlUtils.substitute_query( + "https://woo.test:8888/sdf?param=value", + "newparam=newvalue&othernewparam=othernewvalue" + ), + ( + "https://woo.test:8888/sdf?newparam=newvalue&" + "othernewparam=othernewvalue" + ) + ) + self.assertEqual( + UrlUtils.substitute_query( + "https://woo.test:8888/sdf?param=value", + "newparam=newvalue&othernewparam=othernewvalue" + ), + ( + "https://woo.test:8888/sdf?newparam=newvalue&" + "othernewparam=othernewvalue" + ) + ) + + def test_url_add_query(self): + self.assertEqual( + "https://woo.test:8888/sdf?param=value&newparam=newvalue", + UrlUtils.add_query( + "https://woo.test:8888/sdf?param=value", 'newparam', 'newvalue' + ) + ) + + def test_url_join_components(self): + self.assertEqual( + 'https://woo.test:8888/wp-json', + UrlUtils.join_components(['https://woo.test:8888/', '', 'wp-json']) + ) + self.assertEqual( + 'https://woo.test:8888/wp-json/wp/v2', + UrlUtils.join_components( + ['https://woo.test:8888/', 'wp-json', 'wp/v2']) + ) + + def test_url_get_php_value(self): + self.assertEqual( + '1', + UrlUtils.get_value_like_as_php(True) + ) + self.assertEqual( + '', + UrlUtils.get_value_like_as_php(False) + ) + self.assertEqual( + 'asd', + UrlUtils.get_value_like_as_php('asd') + ) + self.assertEqual( + '1', + UrlUtils.get_value_like_as_php(1) + ) + self.assertEqual( + '1', + UrlUtils.get_value_like_as_php(1.0) + ) + self.assertEqual( + '1.1', + UrlUtils.get_value_like_as_php(1.1) + ) + + def test_url_get_query_dict_singular(self): + result = UrlUtils.get_query_dict_singular(self.test_url) + self.assertEquals( + result, + { + 'filter[limit]': '2', + 'oauth_nonce': 'c4f2920b0213c43f2e8d3d3333168ec4a22222d1', + 'oauth_timestamp': '1481601370', + 'oauth_consumer_key': + 'ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', + 'oauth_signature_method': 'HMAC-SHA1', + 'oauth_signature': '3ibOjMuhj6JGnI43BQZGniigHh8=', + 'page': '2' + } + ) + + def test_url_get_query_singular(self): + result = UrlUtils.get_query_singular( + self.test_url, 'oauth_consumer_key') + self.assertEqual( + result, + 'ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' + ) + result = UrlUtils.get_query_singular(self.test_url, 'filter[limit]') + self.assertEqual( + text_type(result), + text_type(2) + ) + + def test_url_set_query_singular(self): + result = UrlUtils.set_query_singular(self.test_url, 'filter[limit]', 3) + expected = ( + "http://ich.local:8888/woocommerce/wc-api/v3/products?" + "filter%5Blimit%5D=3&" + "oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&" + "oauth_nonce=c4f2920b0213c43f2e8d3d3333168ec4a22222d1&" + "oauth_signature=3ibOjMuhj6JGnI43BQZGniigHh8%3D&" + "oauth_signature_method=HMAC-SHA1&oauth_timestamp=1481601370&" + "page=2" + ) + self.assertEqual(result, expected) + + def test_url_del_query_singular(self): + result = UrlUtils.del_query_singular(self.test_url, 'filter[limit]') + expected = ( + "http://ich.local:8888/woocommerce/wc-api/v3/products?" + "oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&" + "oauth_nonce=c4f2920b0213c43f2e8d3d3333168ec4a22222d1&" + "oauth_signature=3ibOjMuhj6JGnI43BQZGniigHh8%3D&" + "oauth_signature_method=HMAC-SHA1&" + "oauth_timestamp=1481601370&" + "page=2" + ) + self.assertEqual(result, expected) + + def test_url_remove_default_port(self): + self.assertEqual( + UrlUtils.remove_default_port('http://www.gooogle.com:80/'), + 'http://www.gooogle.com/' + ) + self.assertEqual( + UrlUtils.remove_default_port('http://www.gooogle.com:18080/'), + 'http://www.gooogle.com:18080/' + ) + + def test_seq_filter_true(self): + self.assertEquals( + ['a', 'b', 'c', 'd'], + SeqUtils.filter_true([None, 'a', False, 'b', 'c', 'd']) + ) + + def test_str_remove_tail(self): + self.assertEqual( + 'sdf', + StrUtils.remove_tail('sdf/', '/') + ) + + def test_str_remove_head(self): + self.assertEqual( + 'sdf', + StrUtils.remove_head('/sdf', '/') + ) + + self.assertEqual( + 'sdf', + StrUtils.decapitate('sdf', '/') + ) diff --git a/tests/test_transport.py b/tests/test_transport.py new file mode 100644 index 0000000..a221d3c --- /dev/null +++ b/tests/test_transport.py @@ -0,0 +1,43 @@ +""" API Tests """ +from __future__ import unicode_literals + +import unittest + +from httmock import HTTMock, all_requests +from wordpress.transport import API_Requests_Wrapper + + +class TransportTestcases(unittest.TestCase): + def setUp(self): + self.requester = API_Requests_Wrapper( + url='https://woo.test:8888/', + api='wp-json', + api_version='wp/v2' + ) + + def test_api_url(self): + self.assertEqual( + 'https://woo.test:8888/wp-json', + self.requester.api_url + ) + + def test_endpoint_url(self): + self.assertEqual( + 'https://woo.test:8888/wp-json/wp/v2/posts', + self.requester.endpoint_url('posts') + ) + + def test_request(self): + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + response = self.requester.request( + "GET", "https://woo.test:8888/wp-json/wp/v2/posts") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.request.url, + 'https://woo.test:8888/wp-json/wp/v2/posts') diff --git a/wordpress/__init__.py b/wordpress/__init__.py index 0bfb350..ab933ec 100644 --- a/wordpress/__init__.py +++ b/wordpress/__init__.py @@ -10,7 +10,7 @@ """ __title__ = "wordpress" -__version__ = "1.2.7" +__version__ = "1.2.9" __author__ = "Claudio Sanches @ WooThemes, forked by Derwent" __license__ = "MIT" diff --git a/wordpress/api.py b/wordpress/api.py index 37ced7e..79a1114 100644 --- a/wordpress/api.py +++ b/wordpress/api.py @@ -4,17 +4,18 @@ Wordpress API Class """ -__title__ = "wordpress-api" +from __future__ import unicode_literals # from requests import request import logging -from json import dumps as jsonencode -from six import text_type, binary_type -from wordpress.auth import BasicAuth, OAuth, OAuth_3Leg +from six import text_type +from wordpress.auth import BasicAuth, NoAuth, OAuth, OAuth_3Leg from wordpress.helpers import StrUtils, UrlUtils from wordpress.transport import API_Requests_Wrapper +__title__ = "wordpress-api" + class API(object): """ API Class """ @@ -35,9 +36,15 @@ def __init__(self, url, consumer_key, consumer_secret, **kwargs): auth_class = BasicAuth elif kwargs.get('oauth1a_3leg'): auth_class = OAuth_3Leg + elif kwargs.get('no_auth'): + auth_class = NoAuth - if kwargs.get('version', '').startswith('wc') and kwargs.get('oauth1a_3leg'): - self.logger.warn("WooCommerce JSON Api does not seem to support 3leg") + if ( + kwargs.get('version', '').startswith('wc') + and kwargs.get('oauth1a_3leg') + ): + self.logger.warn( + "WooCommerce JSON Api does not seem to support 3leg") self.auth = auth_class(**auth_kwargs) @@ -103,36 +110,43 @@ def request_post_mortem(self, response=None): try_hostname_mismatch = False - if isinstance(response_json, dict) and ('code' in response_json or 'message' in response_json): - reason = u" - ".join([ - unicode(response_json.get(key)) for key in ['code', 'message', 'data'] \ + if ( + isinstance(response_json, dict) + and ('code' in response_json or 'message' in response_json) + ): + reason = " - ".join([ + text_type(response_json.get(key)) + for key in ['code', 'message', 'data'] if key in response_json ]) - code = response_json.get('code') + code = text_type(response_json.get('code')) if code == 'rest_user_invalid_email': remedy = "Try checking the email %s doesn't already exist" % \ - request_body.get('email') + request_body.get('email') elif code == 'json_oauth1_consumer_mismatch': remedy = "Try deleting the cached credentials at %s" % \ - self.auth.creds_store + self.auth.creds_store elif code == 'woocommerce_rest_cannot_view': if not self.auth.query_string_auth: remedy = "Try enabling query_string_auth" else: remedy = ( - "This error is super generic and can be caused by just " - "about anything. Here are some things to try: \n" + "This error is super generic and can be caused by " + "just about anything. Here are some things to try: \n" " - Check that the account which as assigned to your " "oAuth creds has the correct access level\n" " - Enable logging and check for error messages in " "wp-content and wp-content/uploads/wc-logs\n" - " - Check that your query string parameters are valid\n" - " - Make sure your server is not messing with authentication headers\n" + " - Check that your query string parameters are " + "valid\n" + " - Make sure your server is not messing with " + "authentication headers\n" " - Try a different endpoint\n" - " - Try enabling HTTPS and using basic authentication\n" + " - Try enabling HTTPS and using basic " + "authentication\n" ) elif code == 'woocommerce_rest_authentication_error': @@ -155,25 +169,34 @@ def request_post_mortem(self, response=None): if header_api_url: header_api_url = StrUtils.eviscerate(header_api_url, '/') - if header_api_url and requester_api_url\ - and header_api_url != requester_api_url: - reason = "hostname mismatch. %s != %s" % ( - header_api_url, requester_api_url - ) + if ( + header_api_url and requester_api_url + and StrUtils.to_text(header_api_url) + != StrUtils.to_text(requester_api_url) + ): + reason = "hostname mismatch. %s != %s" % tuple(map( + StrUtils.to_text, [ + header_api_url, requester_api_url + ] + )) header_url = StrUtils.eviscerate(header_api_url, '/') - header_url = StrUtils.eviscerate(header_url, self.requester.api) + header_url = StrUtils.eviscerate( + header_url, self.requester.api) header_url = StrUtils.eviscerate(header_url, '/') remedy = "try changing url to %s" % header_url - msg = u"API call to %s returned \nCODE: %s\nRESPONSE:%s \nHEADERS: %s\nREQ_BODY:%s" % ( + msg = ( + "API call to %s returned \nCODE: " + "%s\nRESPONSE:%s \nHEADERS: %s\nREQ_BODY:%s" + ) % tuple(map(StrUtils.to_text, [ request_url, - text_type(response.status_code), + response.status_code, UrlUtils.beautify_response(response), - text_type(response_headers), - repr(request_body)[:1000] - ) + response_headers, + StrUtils.to_binary(request_body)[:1000] + ])) if reason: - msg += "\nBecause of %s" % reason + msg += "\nBecause of %s" % StrUtils.to_binary(reason) if remedy: msg += "\n%s" % remedy raise UserWarning(msg) @@ -185,19 +208,18 @@ def __request(self, method, endpoint, data, **kwargs): endpoint_url = self.auth.get_auth_url(endpoint_url, method, **kwargs) auth = self.auth.get_auth() - content_type = kwargs.get('headers', {}).get('content-type', 'application/json') + content_type = 'application/json' + for key, value in kwargs.get('headers', {}).items(): + if key.lower() == 'content-type': + content_type = value.lower() if data is not None and content_type.startswith('application/json'): - data = jsonencode(data, ensure_ascii=False) + data = StrUtils.jsonencode(data, ensure_ascii=False) + # enforce utf-8 encoded binary - if isinstance(data, binary_type): - try: - data = data.decode('utf-8') - except UnicodeDecodeError: - data = data.decode('latin-1') - if isinstance(data, text_type): - data = data.encode('utf-8') + data = StrUtils.to_binary(data) + handle_status_codes = kwargs.pop('handle_status_codes', []) response = self.requester.request( method=method, @@ -207,7 +229,7 @@ def __request(self, method, endpoint, data, **kwargs): **kwargs ) - if response.status_code not in [200, 201, 202]: + if response.status_code not in [200, 201, 202] + handle_status_codes: self.request_post_mortem(response) return response diff --git a/wordpress/auth.py b/wordpress/auth.py index af4b7bd..819c957 100644 --- a/wordpress/auth.py +++ b/wordpress/auth.py @@ -6,38 +6,25 @@ __title__ = "wordpress-auth" -# from base64 import b64encode import binascii import json import logging import os +from collections import OrderedDict from hashlib import sha1, sha256 from hmac import new as HMAC +from pprint import pformat from random import randint from time import time -from pprint import pformat -# import webbrowser import requests from requests.auth import HTTPBasicAuth -from requests_oauthlib import OAuth1 from bs4 import BeautifulSoup -from wordpress.helpers import UrlUtils - -try: - from urllib.parse import urlencode, quote, unquote, parse_qs, parse_qsl, urlparse, urlunparse - from urllib.parse import ParseResult as URLParseResult -except ImportError: - from urllib import urlencode, quote, unquote - from urlparse import parse_qs, parse_qsl, urlparse, urlunparse - from urlparse import ParseResult as URLParseResult - -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict +from six.moves.urllib.parse import parse_qs, parse_qsl, quote, urlparse +from wordpress import __version__ +from .helpers import StrUtils, UrlUtils class Auth(object): @@ -64,8 +51,10 @@ def get_auth(self): """ Returns the auth parameter used in requests """ pass + class BasicAuth(Auth): """ Does not perform any signing, just logs in with oauth creds """ + def __init__(self, requester, consumer_key, consumer_secret, **kwargs): super(BasicAuth, self).__init__(requester, **kwargs) self.consumer_key = consumer_key @@ -94,6 +83,16 @@ def get_auth(self): return HTTPBasicAuth(self.consumer_key, self.consumer_secret) +class NoAuth(Auth): + """ + Just a dummy Auth object to allow header based + authorization per request + """ + + def get_auth_url(self, endpoint_url, method, **kwargs): + return endpoint_url + + class OAuth(Auth): """ Signs string with oauth consumer_key and consumer_secret """ oauth_version = '1.0' @@ -113,21 +112,25 @@ def __init__(self, requester, consumer_key, consumer_secret, **kwargs): self.force_nonce = kwargs.pop('force_nonce', None) def get_sign_key(self, consumer_secret, token_secret=None): - "gets consumer_secret and turns it into a bytestring suitable for signing" + """Get consumer_secret, convert to bytestring suitable for signing.""" if not consumer_secret: raise UserWarning("no consumer_secret provided") token_secret = str(token_secret) if token_secret else '' if self.api_namespace == 'wc-api' \ - and self.api_version in ["v1", "v2"]: + and self.api_version in ["v1", "v2"]: # special conditions for wc-api v1-2 key = consumer_secret else: - key = "%s&%s" % (consumer_secret, token_secret) + key = StrUtils.to_binary("%s&%s" % (consumer_secret, token_secret)) return key def add_params_sign(self, method, url, params, sign_key=None, **kwargs): - """ Adds the params to a given url, signs the url with sign_key if provided, - otherwise generates sign_key automatically and returns a signed url """ + """ + Add the params to a given url. + + Sign the url with sign_key if provided, otherwise generate + sign_key automatically and return a signed url. + """ if isinstance(params, dict): params = list(params.items()) @@ -150,12 +153,13 @@ def add_params_sign(self, method, url, params, sign_key=None, **kwargs): if key != "oauth_signature": params_without_signature.append((key, value)) - self.logger.debug('sorted_params before sign: %s' % pformat(params_without_signature) ) - - signature = self.generate_oauth_signature(method, params_without_signature, url, sign_key) + self.logger.debug('sorted_params before sign: %s' % + pformat(params_without_signature)) - self.logger.debug('signature: %s' % signature ) + signature = self.generate_oauth_signature( + method, params_without_signature, url, sign_key) + self.logger.debug('signature: %s' % signature) params = params_without_signature + [("oauth_signature", signature)] @@ -187,7 +191,7 @@ def get_signature_base_string(cls, method, params, url): url = UrlUtils.substitute_query(url) base_request_uri = quote(url, "") query_string = UrlUtils.flatten_params(params) - query_string = quote( query_string, '~') + query_string = quote(query_string, '~') return "%s&%s&%s" % ( method.upper(), base_request_uri, query_string ) @@ -210,8 +214,8 @@ def generate_oauth_signature(self, method, params, url, key=None): # print "\nstring_to_sign: %s" % repr(string_to_sign) # print "\nkey: %s" % repr(key) sig = HMAC( - bytes(key.encode('utf-8')), - bytes(string_to_sign.encode('utf-8')), + StrUtils.to_binary(key), + StrUtils.to_binary(string_to_sign), hmac_mod ) sig_b64 = binascii.b2a_base64(sig.digest())[:-1] @@ -237,13 +241,21 @@ def generate_nonce(cls): sha1 ).hexdigest() + class OAuth_3Leg(OAuth): - """ Provides 3 legged OAuth1a, mostly based off this: http://www.lexev.org/en/2015/oauth-step-step/""" + """ + Provide 3 legged OAuth1a. + + Mostly based off this: http://www.lexev.org/en/2015/oauth-step-step/ + """ # oauth_version = '1.0A' - def __init__(self, requester, consumer_key, consumer_secret, callback, **kwargs): - super(OAuth_3Leg, self).__init__(requester, consumer_key, consumer_secret, **kwargs) + def __init__( + self, requester, consumer_key, consumer_secret, callback, **kwargs + ): + super(OAuth_3Leg, self).__init__( + requester, consumer_key, consumer_secret, **kwargs) self.callback = callback self.wp_user = kwargs.pop('wp_user', None) self.wp_pass = kwargs.pop('wp_pass', None) @@ -257,32 +269,44 @@ def __init__(self, requester, consumer_key, consumer_secret, callback, **kwargs) @property def authentication(self): - """ This is an object holding the authentication links discovered from the API - automatically generated if accessed before generated """ + """ + Provide authentication links discovered from the API. + + Automatically generated if accessed before generated. + """ if not self._authentication: self._authentication = self.discover_auth() return self._authentication @property def oauth_verifier(self): - """ This is the verifier string used in authentication - automatically generated if accessed before generated """ + """ + Verifier string used in authentication. + + Automatically generated if accessed before generated. + """ if not self._oauth_verifier: self._oauth_verifier = self.get_verifier() return self._oauth_verifier @property def request_token(self): - """ This is the oauth_token used in requesting an access_token - automatically generated if accessed before generated """ + """ + OAuth token used in requesting an access_token. + + Automatically generated if accessed before generated. + """ if not self._request_token: self.get_request_token() return self._request_token @property def access_token(self): - """ This is the oauth_token used to sign requests to protected resources - automatically generated if accessed before generated """ + """ + OAuth token used to sign requests to protected resources. + + Automatically generated if accessed before generated. + """ if not self._access_token and self.creds_store: self.retrieve_access_creds() if not self._access_token: @@ -294,8 +318,10 @@ def creds_store(self): if self._creds_store: return os.path.expanduser(self._creds_store) - def get_auth_url(self, endpoint_url, method): - """ Returns the URL with OAuth params """ + def get_auth_url(self, endpoint_url, method, **kwargs): + """ + Return the URL with OAuth params. + """ assert self.access_token, "need a valid access token for this step" assert self.access_token_secret, \ "need a valid access token secret for this step" @@ -306,14 +332,17 @@ def get_auth_url(self, endpoint_url, method): ('oauth_token', self.access_token) ] - sign_key = self.get_sign_key(self.consumer_secret, self.access_token_secret) + sign_key = self.get_sign_key( + self.consumer_secret, self.access_token_secret) - self.logger.debug('sign_key: %s' % sign_key ) + self.logger.debug('sign_key: %s' % sign_key) return self.add_params_sign(method, endpoint_url, params, sign_key) def discover_auth(self): - """ Discovers the location of authentication resourcers from the API""" + """ + Discover the location of authentication resourcers from the API. + """ discovery_url = self.requester.api_url response = self.requester.request('GET', discovery_url) @@ -331,9 +360,11 @@ def discover_auth(self): if not has_authentication_resources: raise UserWarning( ( - "Resopnse does not include location of authentication resources.\n" + "Response does not include location of authentication " + "resources.\n" "Resopnse: %s\n%s\n" - "Please check you have configured the Wordpress OAuth1 plugin correctly." + "Please check you have configured the Wordpress OAuth1 " + "plugin correctly." ) % (response, response.text[:500]) ) @@ -355,7 +386,8 @@ def get_request_token(self): ] request_token_url = self.authentication['oauth1']['request'] - request_token_url = self.add_params_sign("GET", request_token_url, params) + request_token_url = self.add_params_sign( + "GET", request_token_url, params) response = self.requester.get(request_token_url) self.logger.debug('get_request_token response: %s' % response.text) @@ -364,13 +396,21 @@ def get_request_token(self): try: self._request_token = resp_content['oauth_token'][0] except: - raise UserWarning("Could not parse request_token in response from %s : %s" \ - % (repr(response.request.url), UrlUtils.beautify_response(response))) + raise UserWarning( + "Could not parse request_token in response from %s : %s" + % ( + repr(response.request.url), + UrlUtils.beautify_response(response)) + ) try: self.request_token_secret = resp_content['oauth_token_secret'][0] except: - raise UserWarning("Could not parse request_token_secret in response from %s : %s" \ - % (repr(response.request.url), UrlUtils.beautify_response(response))) + raise UserWarning( + "Could not parse request_token_secret in response from %s : %s" + % ( + repr(response.request.url), + UrlUtils.beautify_response(response)) + ) return self._request_token, self.request_token_secret @@ -383,22 +423,31 @@ def parse_login_form_error(self, response, exc, **kwargs): error = login_form_soup.select_one('body#error-page') if error and error.stripped_strings: for stripped_string in error.stripped_strings: - if "plase solve this math problem" in stripped_string.lower(): - raise UserWarning("Can't log in if form has capcha ... yet") - raise UserWarning("could not parse login form error. %s " % str(error)) + if ( + "plase solve this math problem" + in stripped_string.lower() + ): + raise UserWarning( + "Can't log in if form has capcha ... yet") + raise UserWarning( + "could not parse login form error. %s " % str(error)) if response.status_code == 200: error = login_form_soup.select_one('div#login_error') if error and error.stripped_strings: for stripped_string in error.stripped_strings: if "invalid token" in stripped_string.lower(): - raise UserWarning("Invalid token: %s" % repr(kwargs.get('token'))) + raise UserWarning("Invalid token: %s" % + repr(kwargs.get('token'))) elif "invalid username" in stripped_string.lower(): - raise UserWarning("Invalid username: %s" % repr(kwargs.get('username'))) + raise UserWarning("Invalid username: %s" % + repr(kwargs.get('username'))) elif "the password you entered" in stripped_string.lower(): - raise UserWarning("Invalid password: %s" % repr(kwargs.get('password'))) - raise UserWarning("could not parse login form error. %s " % str(error)) + raise UserWarning("Invalid password: %s" % + repr(kwargs.get('password'))) + raise UserWarning( + "could not parse login form error. %s " % str(error)) raise UserWarning( - "Login form response was code %s. original error: \n%s" % \ + "Login form response was code %s. original error: \n%s" % (str(response.status_code), repr(exc)) ) @@ -416,7 +465,11 @@ def get_form_info(self, response, form_id): form_soup = response_soup.select_one('form#%s' % form_id) assert \ form_soup, "unable to find form with id=%s in %s " \ - % (form_id, (response_soup.prettify()).encode('ascii', errors='backslashreplace')) + % ( + form_id, + (response_soup.prettify()).encode('ascii', + errors='backslashreplace') + ) # print "login form: \n", form_soup.prettify() action = form_soup.get('action') @@ -425,13 +478,9 @@ def get_form_info(self, response, form_id): % (form_soup.prettify()).encode('ascii', errors='backslashreplace') form_data = OrderedDict() - for input_soup in form_soup.select('input') + form_soup.select('button'): - # print "input, class:%5s, id=%5s, name=%5s, value=%s" % ( - # input_soup.get('class'), - # input_soup.get('id'), - # input_soup.get('name'), - # input_soup.get('value') - # ) + for input_soup in ( + form_soup.select('input') + form_soup.select('button') + ): name = input_soup.get('name') if not name: continue @@ -444,8 +493,12 @@ def get_form_info(self, response, form_id): return action, form_data def get_verifier(self, request_token=None, wp_user=None, wp_pass=None): - """ pretends to be a browser, uses the authorize auth link, submits user creds to WP login form to get - verifier string from access token """ + """ + Get verifier string from access token. + + Pretends to be a browser, uses the authorize auth link, + submits user creds to WP login form. + """ if request_token is None: request_token = self.request_token @@ -457,22 +510,26 @@ def get_verifier(self, request_token=None, wp_user=None, wp_pass=None): wp_pass = self.wp_pass authorize_url = self.authentication['oauth1']['authorize'] - authorize_url = UrlUtils.add_query(authorize_url, 'oauth_token', request_token) + authorize_url = UrlUtils.add_query( + authorize_url, 'oauth_token', request_token) # we're using a different session from the usual API calls # (I think the headers are incompatible?) # self.requester.get(authorize_url) authorize_session = requests.Session() + authorize_session.headers.update( + {'User-Agent': "Wordpress API Client-Python/%s" % __version__}) login_form_response = authorize_session.get(authorize_url) login_form_params = { - 'username':wp_user, - 'password':wp_pass, - 'token':request_token + 'username': wp_user, + 'password': wp_pass, + 'token': request_token } try: - login_form_action, login_form_data = self.get_form_info(login_form_response, 'loginform') + login_form_action, login_form_data = self.get_form_info( + login_form_response, 'loginform') except AssertionError as exc: self.parse_login_form_error( login_form_response, exc, **login_form_params @@ -486,14 +543,16 @@ def get_verifier(self, request_token=None, wp_user=None, wp_pass=None): else: login_form_data[name] = values[0] - assert 'log' in login_form_data, 'input for user login did not appear on form' - assert 'pwd' in login_form_data, 'input for user password did not appear on form' - - # print "submitting login form to %s : %s" % (login_form_action, str(login_form_data)) + assert 'log' in login_form_data, \ + 'input for user login did not appear on form' + assert 'pwd' in login_form_data, \ + 'input for user password did not appear on form' - confirmation_response = authorize_session.post(login_form_action, data=login_form_data, allow_redirects=True) + confirmation_response = authorize_session.post( + login_form_action, data=login_form_data, allow_redirects=True) try: - authorize_form_action, authorize_form_data = self.get_form_info(confirmation_response, 'oauth1_authorize_form') + authorize_form_action, authorize_form_data = self.get_form_info( + confirmation_response, 'oauth1_authorize_form') except AssertionError as exc: self.parse_login_form_error( confirmation_response, exc, **login_form_params @@ -508,15 +567,21 @@ def get_verifier(self, request_token=None, wp_user=None, wp_pass=None): else: authorize_form_data[name] = values[0] - assert 'wp-submit' in login_form_data, 'authorize button did not appear on form' + assert 'wp-submit' in login_form_data, \ + 'authorize button did not appear on form' - final_response = authorize_session.post(authorize_form_action, data=authorize_form_data, allow_redirects=False) + final_response = authorize_session.post( + authorize_form_action, data=authorize_form_data, + allow_redirects=False) - assert \ - final_response.status_code == 302, \ - "was not redirected by authorize screen, was %d instead. something went wrong" \ + assert final_response.status_code == 302, \ + ( + "was not redirected by authorize screen, " + "was %d instead. something went wrong" % final_response.status_code - assert 'location' in final_response.headers, "redirect did not provide redirect location in header" + ) + assert 'location' in final_response.headers, \ + "redirect did not provide redirect location in header" final_location = final_response.headers['location'] @@ -526,9 +591,11 @@ def get_verifier(self, request_token=None, wp_user=None, wp_pass=None): final_location_queries = parse_qs(urlparse(final_location).query) - assert \ - 'oauth_verifier' in final_location_queries, \ - "oauth verifier not provided in final redirect: %s" % final_location + assert 'oauth_verifier' in final_location_queries, \ + ( + "oauth verifier not provided in final redirect: %s" + % final_location + ) self._oauth_verifier = final_location_queries['oauth_verifier'][0] return self._oauth_verifier @@ -545,11 +612,17 @@ def store_access_creds(self): if self.access_token_secret: creds['access_token_secret'] = self.access_token_secret if creds: + dirname = os.path.dirname(self.creds_store) + dirname = os.path.expanduser(dirname) + dirname = os.path.expandvars(dirname) + if not os.path.exists(dirname): + os.mkdir(dirname) with open(self.creds_store, 'w+') as creds_store_file: - json.dump(creds, creds_store_file, ensure_ascii=False) + StrUtils.to_binary( + json.dump(creds, creds_store_file, ensure_ascii=False)) def retrieve_access_creds(self): - """ retrieve the access_token and access_token_secret stored locally. """ + """Retrieve access_token / access_token_secret stored locally.""" if not self.creds_store: return @@ -576,14 +649,14 @@ def clear_stored_creds(self): with open(self.creds_store, 'w+') as creds_store_file: creds_store_file.write('') - def get_access_token(self, oauth_verifier=None): """ Uses the access authentication link to get an access token """ if oauth_verifier is None: oauth_verifier = self.oauth_verifier assert oauth_verifier, "Need an oauth verifier to perform this step" - assert self.request_token, "Need a valid request_token to perform this step" + assert self.request_token, \ + "Need a valid request_token to perform this step" params = self.get_params() params += [ @@ -591,10 +664,12 @@ def get_access_token(self, oauth_verifier=None): ('oauth_verifier', self.oauth_verifier) ] - sign_key = self.get_sign_key(self.consumer_secret, self.request_token_secret) + sign_key = self.get_sign_key( + self.consumer_secret, self.request_token_secret) access_token_url = self.authentication['oauth1']['access'] - access_token_url = self.add_params_sign("POST", access_token_url, params, sign_key) + access_token_url = self.add_params_sign( + "POST", access_token_url, params, sign_key) access_response = self.requester.post(access_token_url) @@ -612,10 +687,16 @@ def get_access_token(self, oauth_verifier=None): try: self._access_token = access_response_queries['oauth_token'][0] - self.access_token_secret = access_response_queries['oauth_token_secret'][0] + self.access_token_secret = \ + access_response_queries['oauth_token_secret'][0] except: - raise UserWarning("Could not parse access_token or access_token_secret in response from %s : %s" \ - % (repr(access_response.request.url), UrlUtils.beautify_response(access_response))) + raise UserWarning( + "Could not parse access_token or access_token_secret in " + "response from %s : %s" + % ( + repr(access_response.request.url), + UrlUtils.beautify_response(access_response)) + ) self.store_access_creds() diff --git a/wordpress/helpers.py b/wordpress/helpers.py index 3fef1d0..081af1b 100644 --- a/wordpress/helpers.py +++ b/wordpress/helpers.py @@ -1,27 +1,26 @@ # -*- coding: utf-8 -*- """ -Wordpress Hellpers Class +Wordpress Hellper Class """ __title__ = "wordpress-requests" -import re - +import json +import locale +import os import posixpath - -try: - from urllib.parse import urlencode, quote, unquote, parse_qs, parse_qsl, urlparse, urlunparse - from urllib.parse import ParseResult as URLParseResult -except ImportError: - from urllib import urlencode, quote, unquote - from urlparse import parse_qs, parse_qsl, urlparse, urlunparse - from urlparse import ParseResult as URLParseResult - +import re +import sys from collections import OrderedDict -from six.moves import reduce from bs4 import BeautifulSoup +from six import (PY2, PY3, binary_type, iterbytes, string_types, text_type, + unichr) +from six.moves import reduce +from six.moves.urllib.parse import ParseResult as URLParseResult +from six.moves.urllib.parse import (parse_qs, parse_qsl, quote, urlencode, + urlparse, urlunparse) class StrUtils(object): @@ -45,13 +44,61 @@ def decapitate(cls, *args, **kwargs): def eviscerate(cls, *args, **kwargs): return cls.remove_tail(*args, **kwargs) + @classmethod + def to_text(cls, string, encoding='utf-8', errors='replace'): + if isinstance(string, text_type): + return string + if isinstance(string, binary_type): + try: + return string.decode(encoding, errors=errors) + except TypeError: + return ''.join([ + unichr(c) for c in iterbytes(string) + ]) + return text_type(string) + + @classmethod + def to_binary(cls, string, encoding='utf8', errors='backslashreplace'): + if isinstance(string, binary_type): + return string + if not isinstance(string, text_type): + string = text_type(string) + return string.encode(encoding, errors) + + @classmethod + def jsonencode(cls, data, **kwargs): + if PY2: + for encoding in filter(None, { + kwargs.get('encoding', 'utf8'), + sys.getdefaultencoding(), + sys.getfilesystemencoding(), + locale.getpreferredencoding(), + 'utf8', + }): + try: + kwargs['encoding'] = encoding + return json.dumps(data, **kwargs) + except UnicodeDecodeError: + pass + kwargs.pop('encoding', None) + kwargs['cls'] = BytesJsonEncoder + return json.dumps(data, **kwargs) + + +class BytesJsonEncoder(json.JSONEncoder): + def default(self, obj): + + if isinstance(obj, binary_type): + return StrUtils.to_text(obj, errors='replace') + # Let the base class default method raise the TypeError + return json.JSONEncoder.default(self, obj) + class SeqUtils(object): @classmethod def filter_true(cls, seq): return [item for item in seq if item] - @classmethod def filter_unique_true(cls, list_a): response = [] @@ -63,8 +110,8 @@ def filter_unique_true(cls, list_a): @classmethod def combine_two_ordered_dicts(cls, dict_a, dict_b): """ - Combine OrderedDict a with b by starting with A and overwriting with items from b. - Attempt to preserve order + Combine OrderedDict a with b by starting with A and overwriting with + items from b. Attempt to preserve order """ if not dict_a: return dict_b if dict_b else OrderedDict() @@ -86,18 +133,22 @@ def combine_ordered_dicts(cls, *args): response = cls.combine_two_ordered_dicts(response, arg) return response + class UrlUtils(object): reg_netloc = r'(?P[^:]+)(:(?P\d+))?' @classmethod def get_query_list(cls, url): - """ returns the list of queries in the url """ + """Return the list of queries in the url.""" return parse_qsl(urlparse(url).query) @classmethod def get_query_dict_singular(cls, url): - """ return an ordered mapping from each key in the query string to a singular value """ + """ + Return an ordered mapping from each key in the query string to a + singular value. + """ query_list = cls.get_query_list(url) return OrderedDict(query_list) # query_dict = parse_qs(urlparse(url).query) @@ -122,7 +173,8 @@ def get_query_singular(cls, url, key, default=None): """ Gets the value of a single query in a url """ url_params = parse_qs(urlparse(url).query) values = url_params.get(key, [default]) - assert len(values) == 1, "ambiguous value, could not get singular for key: %s" % key + assert len(values) == 1, \ + "ambiguous value, could not get singular for key: %s" % key return values[0] @classmethod @@ -135,14 +187,6 @@ def del_query_singular(cls, url, key): url = cls.substitute_query(url, query_string) return url - # @classmethod - # def split_url_query(cls, url): - # """ Splits a url, returning the url without query and the query as a dict """ - # parsed_result = urlparse(url) - # parsed_query_dict = parse_qs(parsed_result.query) - # split_url = cls.substitute_query(url) - # return split_url, parsed_query_dict - @classmethod def split_url_query_singular(cls, url): query_dict_singular = cls.get_query_dict_singular(url) @@ -210,11 +254,13 @@ def beautify_response(response): """ Returns a beautified response in the default locale """ content_type = 'html' try: - content_type = getattr(response, 'headers', {}).get('Content-Type', content_type) + content_type = getattr(response, 'headers', {}).get( + 'Content-Type', content_type) except: pass if 'html' in content_type.lower(): - return BeautifulSoup(response.text, 'lxml').prettify().encode(errors='backslashreplace') + return BeautifulSoup(response.text, 'lxml').prettify().encode( + errors='backslashreplace') else: return response.text @@ -238,8 +284,8 @@ def remove_default_port(cls, url, defaults=None): """ Remove the port number from a URL if it is a default port. """ if defaults is None: defaults = { - 'http':80, - 'https':443 + 'http': 80, + 'https': 443 } urlparse_result = urlparse(url) @@ -291,7 +337,11 @@ def normalize_str(cls, string): @classmethod def normalize_params(cls, params): - """ Normalize parameters. works with RFC 5849 logic. params is a list of key, value pairs """ + """ + Normalize parameters. + + Works with RFC 5849 logic. params is a list of key, value pairs. + """ if isinstance(params, dict): params = params.items() params = [ @@ -306,7 +356,11 @@ def normalize_params(cls, params): @classmethod def sorted_params(cls, params): - """ Sort parameters. works with RFC 5849 logic. params is a list of key, value pairs """ + """ + Sort parameters. + + works with RFC 5849 logic. params is a list of key, value pairs + """ if isinstance(params, dict): params = params.items() @@ -348,4 +402,4 @@ def flatten_params(cls, params): params = cls.normalize_params(params) params = cls.sorted_params(params) params = cls.unique_params(params) - return "&".join(["%s=%s"%(key, value) for key, value in params]) + return "&".join(["%s=%s" % (key, value) for key, value in params]) diff --git a/wordpress/transport.py b/wordpress/transport.py index 3262070..3f05a97 100644 --- a/wordpress/transport.py +++ b/wordpress/transport.py @@ -7,24 +7,17 @@ __title__ = "wordpress-requests" import logging -from json import dumps as jsonencode from pprint import pformat -from requests import Request, Session +from requests import Session + from wordpress import __default_api__, __default_api_version__, __version__ from wordpress.helpers import SeqUtils, StrUtils, UrlUtils -try: - from urllib.parse import urlencode, quote, unquote, parse_qsl, urlparse, urlunparse - from urllib.parse import ParseResult as URLParseResult -except ImportError: - from urllib import urlencode, quote, unquote - from urlparse import parse_qsl, urlparse, urlunparse - from urlparse import ParseResult as URLParseResult - class API_Requests_Wrapper(object): """ provides a wrapper for making requests that handles session info """ + def __init__(self, url, **kwargs): self.logger = logging.getLogger(__name__) self.url = url @@ -33,6 +26,7 @@ def __init__(self, url, **kwargs): self.timeout = kwargs.get("timeout", 5) self.verify_ssl = kwargs.get("verify_ssl", True) self.session = Session() + self.headers = kwargs.get("headers", {}) @property def is_ssl(self): @@ -46,13 +40,17 @@ def api_url(self): ] return UrlUtils.join_components(components) + @property + def is_wp_json_v1(self): + return self.api == 'wp-json' and self.api_version == 'wp/v1' + @property def api_ver_url(self): components = [ self.url, self.api, ] - if self.api_version != 'wp/v1': + if not self.is_wp_json_v1: components += [ self.api_version ] @@ -70,7 +68,7 @@ def endpoint_url(self, endpoint): self.url, self.api ] - if self.api_version != 'wp/v1': + if not self.is_wp_json_v1: components += [ self.api_version ] @@ -79,27 +77,30 @@ def endpoint_url(self, endpoint): ] return UrlUtils.join_components(components) - def request(self, method, url, auth=None, params=None, data=None, **kwargs): + def request( + self, method, url, auth=None, params=None, data=None, **kwargs + ): headers = { "user-agent": "Wordpress API Client-Python/%s" % __version__, "accept": "application/json" } if data is not None: headers["content-type"] = "application/json;charset=utf-8" + headers = SeqUtils.combine_ordered_dicts( + headers, + self.headers + ) headers = SeqUtils.combine_ordered_dicts( headers, kwargs.get('headers', {}) ) - timeout = self.timeout - if 'timeout' in kwargs: - timeout = kwargs['timeout'] request_kwargs = dict( method=method, url=url, headers=headers, verify=self.verify_ssl, - timeout=timeout, + timeout=kwargs.get('timeout', self.timeout), ) request_kwargs.update(kwargs) if auth is not None: @@ -117,7 +118,8 @@ def request(self, method, url, auth=None, params=None, data=None, **kwargs): self.logger.debug("response_code:\n%s" % pformat(response.status_code)) try: response_json = response.json() - self.logger.debug("response_json:\n%s" % (pformat(response_json)[:1000])) + self.logger.debug("response_json:\n%s" % + (pformat(response_json)[:1000])) except ValueError: response_text = response.text self.logger.debug("response_text:\n%s" % (response_text[:1000])) @@ -130,10 +132,6 @@ def request(self, method, url, auth=None, params=None, data=None, **kwargs): response_links = response.links self.logger.debug("response_links:\n%s" % pformat(response_links)) - - - - return response def get(self, *args, **kwargs):