diff --git a/.gitignore b/.gitignore index 5b78e25..e6b16b7 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,33 @@ dist/ *.egg-info/ run.py run3.py +*.orig +.eggs/* +.cache/v/cache/lastfailed +pylint_report.txt +.pypirc +.tox/* +.pytest_cache/* +.python-version + # Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + # C extensions +*.so + # Distribution / packaging +.Python + # Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +\.vscode/settings\.json + +\.vscode/ diff --git a/.talismanrc b/.talismanrc new file mode 100644 index 0000000..5182285 --- /dev/null +++ b/.talismanrc @@ -0,0 +1,4 @@ +fileignoreconfig: +- filename: .travis.yml + checksum: 1f8ad739036010977c7663abd2a9348f00e7a25424f550f7d0cfc26423e667e5 + ignore_detectors: [] diff --git a/.travis.yml b/.travis.yml index 0a4416f..8ba408d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,35 @@ language: python +sudo: required +env: + global: + CODECOV_TOKEN: + secure: ZXkVg6JLVPav6OJPzfUgIGD64e85N92tgpXA2nymIHfucCTGC5B4yniCTD49jz6xET1m1qRb04lvomCiQ7PeDoBW/LgJLIjXyJOW+2iA9VA3MdxwQF8Teu5W1J34wH4dm6nfn0KCNxhYRDTBkDgvXeUcXP5nNlo9w3sL+FKbAdXucwlL8ytcJXf641YhuGQpIrh1XGioBSNJ54BTRDXQXcRW76XaltxHCsbEv+fLH4yuahJdCTGjsr9cGdygAlo3FcLsqgkcjFCoNjg5UgBcPN8QPfWAppeIrmLRCq/q+p5KH2awPYqH0BL8jdTTmFElyGLmQBNnB6R5tI5HIx7+OCsw79mhXPVRgn3xkRj0OWRjzYlA+vW8JM2rEepixs9CRWtZJdC72oe1aytFb2cyVDfmotLwyuUqFI2ieQkyHgj0OLl1n1tcicRo8eS5RIB8mYicCm29lDrs/J6TFWSl/VqNUtZU+y54I/lv/fiFbRVjtMZ+PdwGHoigaVaIKeWe1TmlGWun7bh4Ov3jz62WtBlvuhz3LHMYD8OIuijot0HHqWsC1mlmZUvKoeSDYFXNLBBSAwMkkAfxxQM0PhMG3qUUirkd5xPJyBh1n8d4/KQzrkTblI7QzZgCwJE1r1L99XMs2/Ugf65gfTxRYFOMZGZUi0rzvXlu0Z7P5VrQr6c= + CC_TEST_REPORTER_ID: + secure: IceCOfujcdUwsTsg1328sbrvO/33N39/9pHxG/1VkMpqt47WDlG1lxbQGV78WuK7exip/JaDcB+iWPNJbyxGirOK0Co64O61iZcKUEbH6wjWxjeZ2yuhpyxyrnUF9OWmk8op4ewkU2ww6tzXQT2Lo+b/g8ryTFag0o8roA9unCj5p42aywZ927UIagaVqQh0sJ/qUUCmwAvGIB8bqKL8nxg97PwgBy38mH5PWE3Bqkm0FBpreKb1x4m4n9wZE29noiImT0xEIZMCwZ4zUPzbpKQmdUe1tHWf0hoQuVPWHLCMwqU2AW/PiY3CqlaAiUX71450WaKDrjbBtUvDl73YaUdiroWoL4rrm2UjlNGFbpEoqEbdBn2HLEefCw+zoo8YEPxieXVUQgmRygGpgoHTrRFqkReLA2BxV6F7IeMZ2AtOW0OXejzjcOEBWnRFs2sF6EqQZL8decye3P5CPcKVzNg28QEBBtdYgYT02qlY8JFv8N6KU9qNMjMvT9yQU8lfbV0iteMtdZl4coinNR34hNf9jMY+uj3/44kHgooygur/A9tHgQt/9/VTpS//y79gG6+ozllwzFQjzWE1AqLUPnPtSJZpvF8F5mmnww/sf/pjsV7jvA9VwyF/paO2JicGIN+bw86FNydXRHP3mmEAfJXOBiJVr5xPD2Wi1Q8Dw3k= +services: +- docker python: - - "2.6" - - "2.7" - - "3.2" - - "3.3" - - "3.4" - - "nightly" -# command to install dependencies +- '2.7' +- '3.6' +- nightly install: - - pip install . - - pip install -r requirements-test.txt -# command to run tests -script: nosetests +- docker-compose up -d +- pip install . +- pip install -r requirements-test.txt +- docker exec -it wpapipython_woocommerce_1 bash -c 'until [ -f .done ]; do sleep + 1; done; echo "complete"' +before_script: +- curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 > ./cc-test-reporter +- chmod +x ./cc-test-reporter +- "./cc-test-reporter before-build" +script: +- py.test --cov=wordpress tests +after_success: +- codecov +- "./cc-test-reporter after-build --exit-code $TRAVIS_TEST_RESULT --debug" +deploy: + provider: pypi + user: derwents + skip_existing: true + password: + secure: Hj0CU4CzrA1WhZ/lHW7sDYXjmYSBnEfooRPwgoIMbjk+1G1THNk5A0efR6Vmjh5Ugm8vejHRRCUuYY6UwFE86KbGRUCModx/9rSeoLSKJ1DPIryHNRXgDDJGx2zt8fbvOwnzY7vFcvGDnN65EkGcZxekdwi4kNGQgp54vCQzjvf+dxo4A9+YuHPl8JzQekHQP3uU4VIMGgKSNpMfCdTraBzO8rAX0EuBTEEjczDJn3X77D/vSgqgA6G8oU0tcgnpd9ryydSHoc1icU70D0NUvHTLRr/RNVfIk+QVEcvcFedg4Wo81rcXxta1UQTmyIIZGBGNfNo/68o9GqsD0Edc4oHfZkMqainknaKfke8LtqdEEkOSwuhHX4NdzwBnMCV5rMud2W42hikiJKEPy3cGZJjiabUmEG8IpI1EsiMz5zCod/OVPGq87n4towvnsIpIzyGC7JkbSxHn+NYASkIkX38lhiPzYpgW7VZXRAUPebkxW8P2Bmyx0A8Sli2ijAkx04ul6jk1/HGCB8Y4EtQ9GuefH5pwfV1fhb2lEf56Dyd+REdZia/jiU+dKoPXYv+ZmM1ynrQVwn1/ZCHZejLOzhCGR5Dxk2yjT51hKPHcNzKboR++XiiKML1/cPSTDcGSamADazKuLMqJkP+CWRPkwts+tKBOka0YLCVJwD4ZFgU= diff --git a/README.rst b/README.rst index 5033d80..179d2c7 100644 --- a/README.rst +++ b/README.rst @@ -1,80 +1,233 @@ -WooCommerce API - Python Client +**A note from the author:** I no longer do Wordpress work, so I won't have the time to adequately maintain this repo. If you would like to maintain a fork of this repo, and want me to link to your fork here, please `let me know `_. + +One such fork is https://github.com/Synoptik-Labs/wp-api-python + +thanks! + +Wordpress API - Python Client =============================== -A Python wrapper for the WooCommerce REST API. Easily interact with the WooCommerce REST API using this library. +.. image:: https://travis-ci.org/derwentx/wp-api-python.svg?branch=master + :target: https://travis-ci.org/derwentx/wp-api-python + +.. image:: https://api.codeclimate.com/v1/badges/4df627621037b2df7e5d/maintainability + :target: https://codeclimate.com/github/derwentx/wp-api-python/maintainability + :alt: Maintainability + +.. image:: https://api.codeclimate.com/v1/badges/4df627621037b2df7e5d/test_coverage + :target: https://codeclimate.com/github/derwentx/wp-api-python/test_coverage + :alt: Test Coverage + +.. image:: https://snyk.io/test/github/derwentx/wp-api-python/badge.svg?targetFile=requirements.txt + :target: https://snyk.io/test/github/derwentx/wp-api-python?targetFile=requirements.txt + +.. image:: https://badge.fury.io/py/wordpress-api.svg + :target: https://badge.fury.io/py/wordpress-api -.. image:: https://secure.travis-ci.org/woocommerce/wc-api-python.svg - :target: http://travis-ci.org/woocommerce/wc-api-python +A Python wrapper for the Wordpress and WooCommerce REST APIs with oAuth1a 3leg support. -.. image:: https://img.shields.io/pypi/v/woocommerce.svg - :target: https://pypi.python.org/pypi/WooCommerce +Supports the Wordpress REST API v1-2, WooCommerce REST API v1-3 and WooCommerce WP-API v1-2 (with automatic OAuth3a handling). +Forked from the excellent WooCommerce API written by Claudio Sanches and modified to work with Wordpress: https://github.com/woocommerce/wc-api-python +I created this fork because I prefer the way that the wc-api-python client interfaces with +the Wordpress API compared to the existing python client, https://pypi.python.org/pypi/wordpress_json +which does not support OAuth authentication, only Basic Authentication (very unsecure) + +Any comments about how you're using the API and suggestions about how this repository could be improved are welcome :). +You can find my contact info in my GitHub profile. + +Roadmap +------- + +- [x] Create initial fork +- [x] Implement 3-legged OAuth on Wordpress client +- [x] Better local storage of OAuth credentials to stop unnecessary API keys being generated +- [x] Support image upload to WC Api +- [ ] Better handling of timeouts with a back-off +- [ ] Implement iterator for convenient access to API items + +Requirements +------------ + +Wordpress version 4.7+ comes pre-installed with REST API v2, so you don't need to have the WP REST API plugin if you have the latest Wordpress. + +You should have the following plugins installed on your wordpress site: + +- **WP REST API** (only required for WP < v4.7, recommended version: 2.0+) +- **WP REST API - OAuth 1.0a Server** (optional, if you want oauth within the wordpress API. https://github.com/WP-API/OAuth1) +- **WP REST API - Meta Endpoints** (optional) +- **WP API Basic Auth** https://github.com/WP-API/Basic-Auth (for image uploading) +- **WooCommerce** (optional, if you want to use the WooCommerce API) + +The following python packages are also used by the package + +- **requests** +- **beautifulsoup** Installation ------------ +Install with pip + .. code-block:: bash - pip install woocommerce + pip install wordpress-api + +Download this repo and use setuptools to install the package + +.. code-block:: bash + + pip install setuptools + git clone https://github.com/derwentx/wp-api-python + python setup.py install + +Testing +------- + +Some of the tests make API calls to a dockerized woocommerce container. Don't +worry! It's really simple to set up. You just need to install docker and run + +.. code-block:: bash + + docker-compose up -d + # this just waits until the docker container is set up and exits + docker exec -it wpapipython_woocommerce_1 bash -c 'until [ -f .done ]; do sleep 1; done; echo "complete"' + +Then you can test with: + +.. code-block:: bash + + pip install -r requirements-test.txt + python setup.py test + +Publishing +---------- + +Note to self because I keep forgetting how to use Twine >_< + +.. code-block:: bash + + python setup.py sdist bdist_wheel + # Check that you've updated changelog + twine upload dist/wordpress-api-$(python setup.py --version) -r pypitest + twine upload dist/wordpress-api-$(python setup.py --version) -r pypi + Getting started --------------- -Generate API credentials (Consumer Key & Consumer Secret) following this instructions http://docs.woocommerce.com/document/woocommerce-rest-api/. +Generate API credentials (Consumer Key & Consumer Secret) following these instructions: http://v2.wp-api.org/guide/authentication/ -Check out the WooCommerce API endpoints and data that can be manipulated in http://woocommerce.github.io/woocommerce-rest-api-docs/. +Simply go to Users -> Applications and create an Application, e.g. "REST API". +Enter a callback URL that you will be able to remember later such as "http://example.com/oauth1_callback" (not really important for this client). +Store the resulting Key and Secret somewhere safe. + +Check out the Wordpress API endpoints and data that can be manipulated in http://v2.wp-api.org/reference/. Setup ----- -Setup for the old WooCommerce API v3: +Wordpress API with Basic authentication: +---- +(Note: requires Basic Authentication plugin) + +.. code-block:: python + + from wordpress import API + + wpapi = API( + url="http://example.com", + api="wp-json", + version='wp/v2', + wp_user="XXXX", + wp_pass="XXXX", + basic_auth = True, + user_auth = True, + ) + +WP REST API v2: +---- +(Note: the username and password are required so that it can fill out the oauth request token form automatically for you. +Requires OAuth 1.0a plugin. ) .. code-block:: python - from woocommerce import API + #... + + wpapi = API( + url="http://example.com", + consumer_key="XXXXXXXXXXXX", + consumer_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", + api="wp-json", + version="wp/v2", + wp_user="XXXX", + wp_pass="XXXX", + oauth1a_3leg=True, + creds_store="~/.wc-api-creds.json" + ) + +Legacy WooCommerce API v3: +---- + +.. code-block:: python + + #... wcapi = API( url="http://example.com", consumer_key="ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", - consumer_secret="cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + consumer_secret="cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", + api="wc-api", + version="v3" ) -Setup for the new WP REST API integration (WooCommerce 2.6 or later): +New WC REST API: +---- +Note: oauth1a 3legged works with Wordpress but not with WooCommerce. However oauth1a signing still works. +If you try to do oauth1a_3leg with WooCommerce it just says "consumer_key not valid", even if it is valid. .. code-block:: python - from woocommerce import API + #... wcapi = API( url="http://example.com", consumer_key="ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", consumer_secret="cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", - wp_api=True, - version="wc/v1" + api="wp-json", + version="wc/v2", + callback='http://127.0.0.1/oauth1_callback' ) + Options ~~~~~~~ -+-----------------------+-------------+----------+-------------------------------------------------------------------------------------------------------+ -| Option | Type | Required | Description | -+=======================+=============+==========+=======================================================================================================+ -| ``url`` | ``string`` | yes | Your Store URL, example: http://woo.dev/ | -+-----------------------+-------------+----------+-------------------------------------------------------------------------------------------------------+ -| ``consumerKey`` | ``string`` | yes | Your API consumer key | -+-----------------------+-------------+----------+-------------------------------------------------------------------------------------------------------+ -| ``consumerSecret`` | ``string`` | yes | Your API consumer secret | -+-----------------------+-------------+----------+-------------------------------------------------------------------------------------------------------+ -| ``wp_api`` | ``bool`` | no | Allow requests to the WP REST API (WooCommerce 2.6 or later) | -+-----------------------+-------------+----------+-------------------------------------------------------------------------------------------------------+ -| ``version`` | ``string`` | no | API version, default is ``v3`` | -+-----------------------+-------------+----------+-------------------------------------------------------------------------------------------------------+ -| ``timeout`` | ``integer`` | no | Connection timeout, default is ``5`` | -+-----------------------+-------------+----------+-------------------------------------------------------------------------------------------------------+ -| ``verify_ssl`` | ``bool`` | no | Verify SSL when connect, use this option as ``False`` when need to test with self-signed certificates | -+-----------------------+-------------+----------+-------------------------------------------------------------------------------------------------------+ -| ``query_string_auth`` | ``bool`` | no | Force Basic Authentication as query string when ``True`` and using under HTTPS, default is ``False`` | -+-----------------------+-------------+----------+-------------------------------------------------------------------------------------------------------+ ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| Option | Type | Required | Description | ++=======================+=============+==========+==================================================================================================================+ +| ``url`` | ``string`` | yes | Your Store URL, example: http://wp.dev/ | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``consumerKey`` | ``string`` | yes | Your API consumer key | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``consumerSecret`` | ``string`` | yes | Your API consumer secret | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``api`` | ``string`` | no | Determines which api to use, defaults to ``wp-json``, can be arbitrary: ``wc-api``, ``oembed`` | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``version`` | ``string`` | no | API version, default is ``wp/v2``, can be ``v3`` or ``wc/v1`` if using ``wc-api`` | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``timeout`` | ``integer`` | no | Connection timeout, default is ``5`` | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``verify_ssl`` | ``bool`` | no | Verify SSL when connect, use this option as ``False`` when need to test with self-signed certificates | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``basic_auth`` | ``bool`` | no | Force Basic Authentication, can be through query string or headers (default) | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``query_string_auth`` | ``bool`` | no | Use query string for Basic Authentication when ``True`` and using HTTPS, default is ``False`` which uses header | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``oauth1a_3leg`` | ``string`` | no | use oauth1a 3-legged authentication | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ +| ``creds_store`` | ``string`` | no | JSON file where oauth verifier is stored (only used with OAuth_3Leg) | ++-----------------------+-------------+----------+------------------------------------------------------------------------------------------------------------------+ Methods ------- @@ -82,7 +235,7 @@ Methods +--------------+----------------+------------------------------------------------------------------+ | Params | Type | Description | +==============+================+==================================================================+ -| ``endpoint`` | ``string`` | WooCommerce API endpoint, example: ``customers`` or ``order/12`` | +| ``endpoint`` | ``string`` | API endpoint, example: ``posts`` or ``user/12`` | +--------------+----------------+------------------------------------------------------------------+ | ``data`` | ``dictionary`` | Data that will be converted to JSON | +--------------+----------------+------------------------------------------------------------------+ @@ -112,6 +265,25 @@ OPTIONS - ``.options(endpoint)`` +Upload an image +----- + +(Note: this only works on WP API with the Basic Auth plugin enabled: https://github.com/WP-API/Basic-Auth ) + +.. code-block:: python + + assert os.path.exists(img_path), "img should exist" + data = open(img_path, 'rb').read() + filename = os.path.basename(img_path) + _, extension = os.path.splitext(filename) + headers = { + 'cache-control': 'no-cache', + 'content-disposition': 'attachment; filename=%s' % filename, + 'content-type': 'image/%s' % extension + } + endpoint = "/media" + return wpapi.post(endpoint, data, headers=headers) + Response -------- @@ -121,7 +293,8 @@ Example of returned data: .. code-block:: bash - >>> r = wcapi.get("products") + >>> from wordpress import api as wpapi + >>> r = wpapi.get("posts") >>> r.status_code 200 >>> r.headers['content-type'] @@ -129,61 +302,80 @@ Example of returned data: >>> r.encoding 'UTF-8' >>> r.text - u'{"products":[{"title":"Flying Ninja","id":70,...' // Json text + u'{"posts":[{"title":"Flying Ninja","id":70,...' // Json text >>> r.json() - {u'products': [{u'sold_individually': False,... // Dictionary data + {u'posts': [{u'sold_individually': False,... // Dictionary data +A note on DELETE requests. +===== -Changelog ---------- +The extra keyword arguments passed to the function of a `__request` call (such as `.delete()`) to a `wordpress.API` object are used to modify a `Requests.request` call, this is to allow you to specify custom parameters to modify how the request is made such as `headers`. At the moment it only passes the `headers` parameter to requests, but if I see a use case for it, I can forward more of the parameters to `Requests`. +The `delete` function doesn’t accept a data object because a HTTP DELETE request does not typically have a payload, and some implementations of a HTTP server would reject a DELETE request that has a payload. +You can still pass api request parameters in the query string of the URL. I would suggest using a library like `urlparse` / `urllib.parse` to modify the query string if you are automatically deleting users. +According the the [documentation](https://developer.wordpress.org/rest-api/reference/users/#delete-a-user) for deleting a user, you need to pass the `force` and `reassign` parameters to the API, which can be done by appending them to the endpoint URL. +.. code-block:: python + >>> response = wpapi.delete(‘/users/?reassign=&force=true’) + >>> response.json() + {“deleted”:true, ... } -1.2.1 - 2016/12/14 -~~~~~~~~~~~~~~~~~~ +A Note on Encoding +==== -- Fixed WordPress 4.7 compatibility. +In Python2, make sure to only `POST` unicode string objects or strings that +have been correctly encoded as utf-8. Serializing objects containing non-utf8 +byte strings in Python2 is broken by importing `unicode_literals` from +`__future__` because of a bug in `json.dumps`. You may be able to get around +this problem by serializing the data yourself. -1.2.0 - 2016/06/22 -~~~~~~~~~~~~~~~~~~ -- Added option ``query_string_auth`` to allow Basic Auth as query strings. +Changelog +--------- -1.1.1 - 2016/06/03 +1.2.8 - 2018/10/13 ~~~~~~~~~~~~~~~~~~ +- Much better python3 support +- really good tests +- added NoAuth option for adding custom headers (like JWT) -- Fixed oAuth signature for WP REST API. - -1.1.0 - 2016/05/09 +1.2.7 - 2018/06/18 ~~~~~~~~~~~~~~~~~~ +- Don't crash on "-1" response from API. +- Fix windows encoding error -- Added support for WP REST API. -- Added method to do HTTP OPTIONS requests. - -1.0.5 - 2015/12/07 +1.2.6 - 2018/01/29 ~~~~~~~~~~~~~~~~~~ +- Better Python3 support +- Tested on Python v3.6.2 and v2.7.13 -- Fixed oAuth filters sorting. - -1.0.4 - 2015/09/25 +1.2.5 - 2017/12/07 ~~~~~~~~~~~~~~~~~~ +- Better UTF-8 support -- Implemented ``timeout`` argument for ``API`` class. - -1.0.3 - 2015/08/07 +1.2.4 - 2017/10/01 ~~~~~~~~~~~~~~~~~~ +- Support for image upload +- More accurate documentation of WP authentication methods -- Forced utf-8 encoding on ``API.__request()`` to avoid ``UnicodeDecodeError`` - -1.0.2 - 2015/08/05 +1.2.3 - 2017/09/07 ~~~~~~~~~~~~~~~~~~ +- Better local storage of OAuth creds to stop unnecessary API keys being generated +- Improve parsing of API errors to display much more useful error information -- Fixed handler for query strings - -1.0.1 - 2015/07/13 +1.2.2 - 2017/06/16 ~~~~~~~~~~~~~~~~~~ +- support basic auth without https +- rename oauth module to auth (since auth covers oauth and basic auth) +- tested with latest versions of WP and WC -- Fixed support for Python 2.6 +1.2.1 - 2016/12/13 +~~~~~~~~~~~~~~~~~~ +- tested to handle complex queries like filter[limit] +- fix: Some edge cases where queries were out of order causing signature mismatch +- hardened helper and api classes and added corresponding test cases -1.0.1 - 2015/07/12 +1.2.0 - 2016/09/28 ~~~~~~~~~~~~~~~~~~ -- Initial version +- Initial fork +- Implemented 3-legged OAuth +- Tested with pagination diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..4dc59e1 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,40 @@ +version: "2" +services: + db: + image: mariadb + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + MYSQL_DATABASE: "wordpress" + MYSQL_ROOT_PASSWORD: "" + ports: + - "8082:3306" + woocommerce: + image: derwentx/woocommerce-api + environment: + WORDPRESS_DB_HOST: "db" + WORDPRESS_DB_NAME: "wordpress" + WORDPRESS_DB_PASSWORD: "" + WORDPRESS_DB_USER: "root" + WORDPRESS_SITE_URL: "http://localhost:8083/" + WORDPRESS_SITE_TITLE: "API Test" + WORDPRESS_ADMIN_USER: "admin" + WORDPRESS_ADMIN_PASSWORD: "admin" + WORDPRESS_ADMIN_EMAIL: "admin@example.com" + WORDPRESS_DEBUG: 1 + WORDPRESS_PLUGINS: "https://github.com/WP-API/Basic-Auth/archive/master.zip" + WORDPRESS_API_APPLICATION: "Test" + WORDPRESS_API_DESCRIPTION: "Test Application" + WORDPRESS_API_CALLBACK: "http://127.0.0.1/oauth1_callback" + WORDPRESS_API_KEY: "tYG1tAoqjBEM" + WORDPRESS_API_SECRET: "s91fvylVrqChwzzDbEJHEWyySYtAmlIsqqYdjka1KyVDdAyB" + WOOCOMMERCE_TEST_DATA: "1" + WOOCOMMERCE_TEST_DATA_URL: "https://raw.githubusercontent.com/woocommerce/woocommerce/c81b3cf1655f9983db37bff750cb5baae3c3236e/dummy-data/dummy-products.xml" + WOOCOMMERCE_CONSUMER_KEY: "ck_659f6994ae88fed68897f9977298b0e19947979a" + WOOCOMMERCE_CONSUMER_SECRET: "cs_9421d39290f966172fef64ae18784a2dc7b20976" + links: + - db:mysql + ports: + - "8083:80" + depends_on: + - db + command: apache2-foreground diff --git a/requirements-test.txt b/requirements-test.txt index 5f4dc7e..a6d0ee5 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,6 @@ -r requirements.txt -httmock==1.2.3 -nose==1.3.7 +httmock +pytest +pytest-cov<2.6.0 +coverage +codecov diff --git a/requirements.txt b/requirements.txt index d090df9..da0aed0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,10 @@ -requests==2.7.0 -ordereddict==1.1 +six +ordereddict +requests_oauthlib +pathlib2 +funcsigs +requests +more_itertools +colorama +beautifulsoup4 +urllib3>=1.24.3 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..224224d --- /dev/null +++ b/setup.cfg @@ -0,0 +1,15 @@ +[aliases] +test=pytest +[tool:pytest] +addopts = --verbose +python_files = tests/test_*.py +[pylama] +skip=\.*,build/*,dist/*,*.egg-info +[pylama:tests.py] +disable=D +[pylama:radon] +complexity=20 +[pylama:mccabe] +complexity=20 +[pylama:pycodestyle] +max_line_length=100 diff --git a/setup.py b/setup.py index 5712291..8c97734 100644 --- a/setup.py +++ b/setup.py @@ -2,41 +2,60 @@ # -*- coding: utf-8 -*- """ Setup module """ -from setuptools import setup import os import re +from io import open +from setuptools import setup # Get version from __init__.py file VERSION = "" -with open("woocommerce/__init__.py", "r") as fd: - VERSION = re.search(r"^__version__\s*=\s*['\"]([^\"]*)['\"]", fd.read(), re.MULTILINE).group(1) +with open("wordpress/__init__.py", "r") as fd: + VERSION = re.search( + r"^__version__\s*=\s*['\"]([^\"]*)['\"]", fd.read(), re.MULTILINE + ).group(1) if not VERSION: raise RuntimeError("Cannot find version information") # Get long description -README = open(os.path.join(os.path.dirname(__file__), "README.rst")).read() +README = open(os.path.join(os.path.dirname(__file__), + "README.rst"), encoding="utf8").read() # allow setup.py to be run from any path os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir))) setup( - name="WooCommerce", + name="wordpress-api", version=VERSION, - description="A Python wrapper for the WooCommerce REST API", + description=( + "A Python wrapper for the Wordpress and WooCommerce REST APIs " + "with oAuth1a 3leg support" + ), long_description=README, - author="Claudio Sanches @ Automattic", - url="https://github.com/woocommerce/wc-api-python", + author="Claudio Sanches @ Automattic, forked by Derwent @ Laserphile", + url="https://github.com/derwentx/wp-api-python", license="MIT License", packages=[ - "woocommerce" + "wordpress" ], include_package_data=True, platforms=['any'], install_requires=[ "requests", - "ordereddict" + "requests_oauthlib", + "ordereddict", + "beautifulsoup4", + 'lxml', + 'six', + ], + setup_requires=[ + 'pytest-runner', + ], + tests_require=[ + 'httmock', + 'pytest', + 'six' ], classifiers=[ "Development Status :: 5 - Production/Stable", @@ -44,11 +63,9 @@ "Natural Language :: English", "License :: OSI Approved :: MIT License", "Programming Language :: Python", - "Programming Language :: Python :: 2.6", "Programming Language :: Python :: 2.7", - "Programming Language :: Python :: 3.2", - "Programming Language :: Python :: 3.3", - "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.6", "Topic :: Software Development :: Libraries :: Python Modules" ], + keywords='python wordpress woocommerce api development' ) diff --git a/tests.py b/tests.py deleted file mode 100644 index ddae9df..0000000 --- a/tests.py +++ /dev/null @@ -1,136 +0,0 @@ -""" API Tests """ -import unittest -import woocommerce -from woocommerce import oauth -from httmock import all_requests, HTTMock - - -class WooCommerceTestCase(unittest.TestCase): - """Test case for the client methods.""" - - def setUp(self): - self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" - self.api = woocommerce.API( - url="http://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret - ) - - def test_version(self): - """ Test default version """ - api = woocommerce.API( - url="https://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret - ) - - self.assertEqual(api.version, "v3") - - def test_non_ssl(self): - """ Test non-ssl """ - api = woocommerce.API( - url="http://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret - ) - self.assertFalse(api.is_ssl) - - def test_with_ssl(self): - """ Test non-ssl """ - api = woocommerce.API( - url="https://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret - ) - self.assertTrue(api.is_ssl, True) - - def test_with_timeout(self): - """ Test non-ssl """ - api = woocommerce.API( - url="https://woo.test", - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret, - timeout=10, - ) - self.assertEqual(api.timeout, 10) - - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 200, - 'content': 'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = api.get("products").status_code - self.assertEqual(status, 200) - - def test_get(self): - """ Test GET requests """ - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 200, - 'content': 'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = self.api.get("products").status_code - self.assertEqual(status, 200) - - def test_post(self): - """ Test POST requests """ - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 201, - 'content': 'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = self.api.post("products", {}).status_code - self.assertEqual(status, 201) - - def test_put(self): - """ Test PUT requests """ - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 200, - 'content': 'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = self.api.put("products", {}).status_code - self.assertEqual(status, 200) - - def test_delete(self): - """ Test DELETE requests """ - @all_requests - def woo_test_mock(*args, **kwargs): - """ URL Mock """ - return {'status_code': 200, - 'content': 'OK'} - - with HTTMock(woo_test_mock): - # call requests - status = self.api.delete("products").status_code - self.assertEqual(status, 200) - - def test_oauth_sorted_params(self): - """ Test order of parameters for OAuth signature """ - def check_sorted(keys, expected): - params = oauth.OrderedDict() - for key in keys: - params[key] = '' - - ordered = list(oauth.OAuth.sorted_params(params).keys()) - self.assertEqual(ordered, expected) - - check_sorted(['a', 'b'], ['a', 'b']) - check_sorted(['b', 'a'], ['a', 'b']) - check_sorted(['a', 'b[a]', 'b[b]', 'b[c]', 'c'], ['a', 'b[a]', 'b[b]', 'b[c]', 'c']) - check_sorted(['a', 'b[c]', 'b[a]', 'b[b]', 'c'], ['a', 'b[c]', 'b[a]', 'b[b]', 'c']) - check_sorted(['d', 'b[c]', 'b[a]', 'b[b]', 'c'], ['b[c]', 'b[a]', 'b[b]', 'c', 'd']) - check_sorted(['a1', 'b[c]', 'b[a]', 'b[b]', 'a2'], ['a1', 'a2', 'b[c]', 'b[a]', 'b[b]']) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..3fe6c03 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,36 @@ +""" +Test case module. +""" + +from time import time +import sys +import logging +import pdb +import functools +import traceback +import copy + +CURRENT_TIMESTAMP = int(time()) +SHITTY_NONCE = "" +DEFAULT_ENCODING = sys.getdefaultencoding() + + +def debug_on(*exceptions): + if not exceptions: + exceptions = (AssertionError, ) + + def decorator(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + prev_root = copy(logging.root) + try: + logging.basicConfig(level=logging.DEBUG) + return f(*args, **kwargs) + except exceptions: + info = sys.exc_info() + traceback.print_exception(*info) + pdb.post_mortem(info[2]) + finally: + logging.root = prev_root + return wrapper + return decorator diff --git a/tests/data/test.jpg b/tests/data/test.jpg new file mode 100644 index 0000000..ea01a22 Binary files /dev/null and b/tests/data/test.jpg differ diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..e9c39f8 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,535 @@ +""" API Tests """ +from __future__ import unicode_literals + +import os +import random +import unittest + +import requests + +import six +import wordpress +from httmock import HTTMock, all_requests +from six import text_type +from wordpress import __default_api__, __default_api_version__, auth +from wordpress.api import API +from wordpress.auth import Auth +from wordpress.helpers import StrUtils, UrlUtils + +from . import CURRENT_TIMESTAMP, SHITTY_NONCE + + +class WordpressTestCase(unittest.TestCase): + """Test case for the mocked client methods.""" + + def setUp(self): + self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.api = wordpress.API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + + def test_api(self): + """ Test default API """ + api = wordpress.API( + url="https://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + + self.assertEqual(api.namespace, __default_api__) + + def test_version(self): + """ Test default version """ + api = wordpress.API( + url="https://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + + self.assertEqual(api.version, __default_api_version__) + + def test_non_ssl(self): + """ Test non-ssl """ + api = wordpress.API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + self.assertFalse(api.is_ssl) + + def test_with_ssl(self): + """ Test non-ssl """ + api = wordpress.API( + url="https://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret + ) + self.assertTrue(api.is_ssl, True) + + def test_with_timeout(self): + """ Test non-ssl """ + api = wordpress.API( + url="https://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + timeout=10, + ) + self.assertEqual(api.timeout, 10) + + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = api.get("products").status_code + self.assertEqual(status, 200) + + def test_get(self): + """ Test GET requests """ + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = self.api.get("products").status_code + self.assertEqual(status, 200) + + def test_post(self): + """ Test POST requests """ + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 201, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = self.api.post("products", {}).status_code + self.assertEqual(status, 201) + + def test_put(self): + """ Test PUT requests """ + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = self.api.put("products", {}).status_code + self.assertEqual(status, 200) + + def test_delete(self): + """ Test DELETE requests """ + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + status = self.api.delete("products").status_code + self.assertEqual(status, 200) + + # @unittest.skip("going by RRC 5849 sorting instead") + def test_oauth_sorted_params(self): + """ Test order of parameters for OAuth signature """ + def check_sorted(keys, expected): + params = auth.OrderedDict() + for key in keys: + params[key] = '' + + params = UrlUtils.sorted_params(params) + ordered = [key for key, value in params] + self.assertEqual(ordered, expected) + + check_sorted(['a', 'b'], ['a', 'b']) + check_sorted(['b', 'a'], ['a', 'b']) + check_sorted(['a', 'b[a]', 'b[b]', 'b[c]', 'c'], + ['a', 'b[a]', 'b[b]', 'b[c]', 'c']) + check_sorted(['a', 'b[c]', 'b[a]', 'b[b]', 'c'], + ['a', 'b[c]', 'b[a]', 'b[b]', 'c']) + check_sorted(['d', 'b[c]', 'b[a]', 'b[b]', 'c'], + ['b[c]', 'b[a]', 'b[b]', 'c', 'd']) + check_sorted(['a1', 'b[c]', 'b[a]', 'b[b]', 'a2'], + ['a1', 'a2', 'b[c]', 'b[a]', 'b[b]']) + + +class WCApiTestCasesBase(unittest.TestCase): + """ Base class for WC API Test cases """ + + def setUp(self): + Auth.force_timestamp = CURRENT_TIMESTAMP + Auth.force_nonce = SHITTY_NONCE + self.api_params = { + 'url': 'http://localhost:8083/', + 'api': 'wc-api', + 'version': 'v3', + 'consumer_key': 'ck_659f6994ae88fed68897f9977298b0e19947979a', + 'consumer_secret': 'cs_9421d39290f966172fef64ae18784a2dc7b20976', + } + + +class WCApiTestCasesLegacy(WCApiTestCasesBase): + """ Tests for WC API V3 """ + + def setUp(self): + super(WCApiTestCasesLegacy, self).setUp() + self.api_params['version'] = 'v3' + self.api_params['api'] = 'wc-api' + + def test_APIGet(self): + wcapi = API(**self.api_params) + response = wcapi.get('products') + # print UrlUtils.beautify_response(response) + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertIn('products', response_obj) + self.assertEqual(len(response_obj['products']), 10) + # print "test_APIGet", response_obj + + def test_APIGetWithSimpleQuery(self): + wcapi = API(**self.api_params) + response = wcapi.get('products?page=2') + # print UrlUtils.beautify_response(response) + self.assertIn(response.status_code, [200, 201]) + + response_obj = response.json() + self.assertIn('products', response_obj) + self.assertEqual(len(response_obj['products']), 8) + # print "test_ApiGenWithSimpleQuery", response_obj + + def test_APIGetWithComplexQuery(self): + wcapi = API(**self.api_params) + response = wcapi.get('products?page=2&filter%5Blimit%5D=2') + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertIn('products', response_obj) + self.assertEqual(len(response_obj['products']), 2) + + response = wcapi.get( + 'products?' + 'oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&' + 'oauth_nonce=037470f3b08c9232b0888f52cb9d4685b44d8fd1&' + 'oauth_signature=wrKfuIjbwi%2BTHynAlTP4AssoPS0%3D&' + 'oauth_signature_method=HMAC-SHA1&' + 'oauth_timestamp=1481606275&' + 'filter%5Blimit%5D=3' + ) + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertIn('products', response_obj) + self.assertEqual(len(response_obj['products']), 3) + + def test_APIPutWithSimpleQuery(self): + wcapi = API(**self.api_params) + response = wcapi.get('products') + first_product = (response.json())['products'][0] + original_title = first_product['title'] + product_id = first_product['id'] + + nonce = b"%f" % (random.random()) + response = wcapi.put('products/%s?filter%%5Blimit%%5D=5' % + (product_id), + {"product": {"title": text_type(nonce)}}) + request_params = UrlUtils.get_query_dict_singular(response.request.url) + response_obj = response.json() + self.assertEqual(response_obj['product']['title'], text_type(nonce)) + self.assertEqual(request_params['filter[limit]'], text_type(5)) + + wcapi.put('products/%s' % (product_id), + {"product": {"title": original_title}}) + + +class WCApiTestCases(WCApiTestCasesBase): + oauth1a_3leg = False + """ Tests for New wp-json/wc/v2 API """ + + def setUp(self): + super(WCApiTestCases, self).setUp() + self.api_params['version'] = 'wc/v2' + self.api_params['api'] = 'wp-json' + self.api_params['callback'] = 'http://127.0.0.1/oauth1_callback' + if self.oauth1a_3leg: + self.api_params['oauth1a_3leg'] = True + + # @debug_on() + def test_APIGet(self): + wcapi = API(**self.api_params) + per_page = 10 + response = wcapi.get('products?per_page=%d' % per_page) + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertEqual(len(response_obj), per_page) + + def test_APIPutWithSimpleQuery(self): + wcapi = API(**self.api_params) + response = wcapi.get('products') + first_product = (response.json())[0] + # from pprint import pformat + # print "first product %s" % pformat(response.json()) + original_title = first_product['name'] + product_id = first_product['id'] + + nonce = b"%f" % (random.random()) + response = wcapi.put('products/%s?page=2&per_page=5' % + (product_id), {"name": text_type(nonce)}) + request_params = UrlUtils.get_query_dict_singular(response.request.url) + response_obj = response.json() + self.assertEqual(response_obj['name'], text_type(nonce)) + self.assertEqual(request_params['per_page'], '5') + + wcapi.put('products/%s' % (product_id), {"name": original_title}) + + @unittest.skipIf(six.PY2, "non-utf8 bytes not supported in python2") + def test_APIPostWithBytesQuery(self): + wcapi = API(**self.api_params) + nonce = b"%f\xff" % random.random() + + data = { + "name": nonce, + "type": "simple", + } + + response = wcapi.post('products', data) + response_obj = response.json() + product_id = response_obj.get('id') + + expected = StrUtils.to_text(nonce, encoding='ascii', errors='replace') + + self.assertEqual( + response_obj.get('name'), + expected, + ) + wcapi.delete('products/%s' % product_id) + + @unittest.skipIf(six.PY2, "non-utf8 bytes not supported in python2") + def test_APIPostWithLatin1Query(self): + wcapi = API(**self.api_params) + nonce = "%f\u00ae" % random.random() + + data = { + "name": nonce.encode('latin-1'), + "type": "simple", + } + + response = wcapi.post('products', data) + response_obj = response.json() + product_id = response_obj.get('id') + + expected = StrUtils.to_text( + StrUtils.to_binary(nonce, encoding='latin-1'), + encoding='ascii', errors='replace' + ) + + self.assertEqual( + response_obj.get('name'), + expected + ) + wcapi.delete('products/%s' % product_id) + + def test_APIPostWithUTF8Query(self): + wcapi = API(**self.api_params) + nonce = "%f\u00ae" % random.random() + + data = { + "name": nonce.encode('utf8'), + "type": "simple", + } + + response = wcapi.post('products', data) + response_obj = response.json() + product_id = response_obj.get('id') + self.assertEqual(response_obj.get('name'), nonce) + wcapi.delete('products/%s' % product_id) + + def test_APIPostWithUnicodeQuery(self): + wcapi = API(**self.api_params) + nonce = "%f\u00ae" % random.random() + + data = { + "name": nonce, + "type": "simple", + } + + response = wcapi.post('products', data) + response_obj = response.json() + product_id = response_obj.get('id') + self.assertEqual(response_obj.get('name'), nonce) + wcapi.delete('products/%s' % product_id) + + +@unittest.skip("these simply don't work for some reason") +class WCApiTestCases3Leg(WCApiTestCases): + """ Tests for New wp-json/wc/v2 API with 3-leg """ + oauth1a_3leg = True + + +class WPAPITestCasesBase(unittest.TestCase): + api_params = { + 'url': 'http://localhost:8083/', + 'api': 'wp-json', + 'version': 'wp/v2', + 'consumer_key': 'tYG1tAoqjBEM', + 'consumer_secret': 's91fvylVrqChwzzDbEJHEWyySYtAmlIsqqYdjka1KyVDdAyB', + 'callback': 'http://127.0.0.1/oauth1_callback', + 'wp_user': 'admin', + 'wp_pass': 'admin', + 'oauth1a_3leg': True, + } + + def setUp(self): + Auth.force_timestamp = CURRENT_TIMESTAMP + Auth.force_nonce = SHITTY_NONCE + self.wpapi = API(**self.api_params) + + # @debug_on() + def test_APIGet(self): + response = self.wpapi.get('users/me') + self.assertIn(response.status_code, [200, 201]) + response_obj = response.json() + self.assertEqual(response_obj['name'], self.api_params['wp_user']) + + def test_APIGetWithSimpleQuery(self): + response = self.wpapi.get('pages?page=2&per_page=2') + self.assertIn(response.status_code, [200, 201]) + + response_obj = response.json() + self.assertEqual(len(response_obj), 2) + + def test_APIPostData(self): + nonce = "%f\u00ae" % random.random() + + content = "api test post" + + data = { + "title": nonce, + "content": content, + "excerpt": content + } + + response = self.wpapi.post('posts', data) + response_obj = response.json() + post_id = response_obj.get('id') + self.assertEqual(response_obj.get('title').get('raw'), nonce) + self.wpapi.delete('posts/%s' % post_id) + + def test_APIPostBadData(self): + """ + No excerpt so should fail to be created. + """ + nonce = "%f\u00ae" % random.random() + + data = { + 'a': nonce + } + + with self.assertRaises(UserWarning): + self.wpapi.post('posts', data) + + def test_APIPostBadDataHandleBadStatus(self): + """ + Test handling explicitly a bad status code for a request. + """ + nonce = "%f\u00ae" % random.random() + + data = { + 'a': nonce + } + + response = self.wpapi.post('posts', data, handle_status_codes=[400]) + self.assertEqual(response.status_code, 400) + + # If we don't specify a correct status code to handle we should + # still expect an exception + with self.assertRaises(UserWarning): + self.wpapi.post('posts', data, handle_status_codes=[404]) + + def test_APIPostMedia(self): + img_path = 'tests/data/test.jpg' + with open(img_path, 'rb') as test_file: + img_data = test_file.read() + img_name = os.path.basename(img_path) + + res = self.wpapi.post( + 'media', + data=img_data, + headers={ + 'Content-Type': 'image/jpg', + 'Content-Disposition' : 'attachment; filename=%s'% img_name + } + ) + + self.assertEqual(res.status_code, 201) + res_obj = res.json() + created_id = res_obj.get('id') + self.assertTrue(created_id) + uploaded_res = requests.get(res_obj.get('source_url')) + + # check for bug where image bytestream was quoted + self.assertNotEqual(StrUtils.to_binary(uploaded_res.text[0]), b'"') + + self.wpapi.delete('media/%s?force=True' % created_id) + + def test_APIPostComplexContent(self): + data = { + 'content': "this content has links" + } + res = self.wpapi.post('posts', data) + + self.assertEqual(res.status_code, 201) + res_obj = res.json() + res_id= res_obj.get('id') + self.assertTrue(res_id) + print(res_obj) + res_content = res_obj.get('content').get('raw') + self.assertEqual(data.get('content'), res_content) + + # def test_APIPostMediaBadCreds(self): + # """ + # TODO: make sure the warning is "ensure login and basic auth is installed" + # """ + # img_path = 'tests/data/test.jpg' + # with open(img_path, 'rb') as test_file: + # img_data = test_file.read() + # img_name = os.path.basename(img_path) + # res = self.wpapi.post( + # 'media', + # data=img_data, + # headers={ + # 'Content-Type': 'image/jpg', + # 'Content-Disposition' : 'attachment; filename=%s'% img_name + # } + # ) + + +class WPAPITestCasesBasic(WPAPITestCasesBase): + api_params = dict(**WPAPITestCasesBase.api_params) + api_params.update({ + 'user_auth': True, + 'basic_auth': True, + 'query_string_auth': False, + }) + + +class WPAPITestCases3leg(WPAPITestCasesBase): + + api_params = dict(**WPAPITestCasesBase.api_params) + api_params.update({ + 'creds_store': '~/wc-api-creds-test.json', + }) + + def setUp(self): + super(WPAPITestCases3leg, self).setUp() + self.wpapi.auth.clear_stored_creds() diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..42893ce --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,478 @@ +""" API Tests """ +from __future__ import unicode_literals + +import random +import unittest +from collections import OrderedDict +from copy import copy +from tempfile import mkstemp + +from httmock import HTTMock, urlmatch +from six import text_type +from six.moves.urllib.parse import parse_qsl, urlparse +from wordpress.api import API +from wordpress.auth import OAuth +from wordpress.helpers import StrUtils, UrlUtils + + +class BasicAuthTestcases(unittest.TestCase): + def setUp(self): + self.base_url = "http://localhost:8888/wp-api/" + self.api_name = 'wc-api' + self.api_ver = 'v3' + self.endpoint = 'products/26' + self.signature_method = "HMAC-SHA1" + + self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.api_params = dict( + url=self.base_url, + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + basic_auth=True, + api=self.api_name, + version=self.api_ver, + query_string_auth=False, + ) + + def test_endpoint_url(self): + api = API( + **self.api_params + ) + endpoint_url = api.requester.endpoint_url(self.endpoint) + endpoint_url = api.auth.get_auth_url(endpoint_url, 'GET') + self.assertEqual( + endpoint_url, + UrlUtils.join_components([ + self.base_url, self.api_name, self.api_ver, self.endpoint + ]) + ) + + def test_query_string_endpoint_url(self): + query_string_api_params = dict(**self.api_params) + query_string_api_params.update(dict(query_string_auth=True)) + api = API( + **query_string_api_params + ) + endpoint_url = api.requester.endpoint_url(self.endpoint) + endpoint_url = api.auth.get_auth_url(endpoint_url, 'GET') + expected_endpoint_url = '%s?consumer_key=%s&consumer_secret=%s' % ( + self.endpoint, self.consumer_key, self.consumer_secret) + expected_endpoint_url = UrlUtils.join_components( + [self.base_url, self.api_name, self.api_ver, expected_endpoint_url] + ) + self.assertEqual( + endpoint_url, + expected_endpoint_url + ) + endpoint_url = api.requester.endpoint_url(self.endpoint) + endpoint_url = api.auth.get_auth_url(endpoint_url, 'GET') + + +class OAuthTestcases(unittest.TestCase): + + def setUp(self): + self.base_url = "http://localhost:8888/wordpress/" + self.api_name = 'wc-api' + self.api_ver = 'v3' + self.endpoint = 'products/99' + self.signature_method = "HMAC-SHA1" + self.consumer_key = "ck_681c2be361e415519dce4b65ee981682cda78bc6" + self.consumer_secret = "cs_b11f652c39a0afd3752fc7bb0c56d60d58da5877" + + self.wcapi = API( + url=self.base_url, + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + api=self.api_name, + version=self.api_ver, + signature_method=self.signature_method + ) + + self.rfc1_api_url = 'https://photos.example.net/' + self.rfc1_consumer_key = 'dpf43f3p2l4k3l03' + self.rfc1_consumer_secret = 'kd94hf93k423kf44' + self.rfc1_oauth_token = 'hh5s93j4hdidpola' + self.rfc1_signature_method = 'HMAC-SHA1' + self.rfc1_callback = 'http://printer.example.com/ready' + self.rfc1_api = API( + url=self.rfc1_api_url, + consumer_key=self.rfc1_consumer_key, + consumer_secret=self.rfc1_consumer_secret, + api='', + version='', + callback=self.rfc1_callback, + wp_user='', + wp_pass='', + oauth1a_3leg=True + ) + self.rfc1_request_method = 'POST' + self.rfc1_request_target_url = 'https://photos.example.net/initiate' + self.rfc1_request_timestamp = '137131200' + self.rfc1_request_nonce = 'wIjqoS' + self.rfc1_request_params = [ + ('oauth_consumer_key', self.rfc1_consumer_key), + ('oauth_signature_method', self.rfc1_signature_method), + ('oauth_timestamp', self.rfc1_request_timestamp), + ('oauth_nonce', self.rfc1_request_nonce), + ('oauth_callback', self.rfc1_callback), + ] + self.rfc1_request_signature = b'74KNZJeDHnMBp0EMJ9ZHt/XKycU=' + + self.twitter_api_url = "https://api.twitter.com/" + self.twitter_consumer_secret = \ + "kAcSOqF21Fu85e7zjz7ZN2U4ZRhfV3WpwPAoE3Z7kBw" + self.twitter_consumer_key = "xvz1evFS4wEEPTGEFPHBog" + self.twitter_signature_method = "HMAC-SHA1" + self.twitter_api = API( + url=self.twitter_api_url, + consumer_key=self.twitter_consumer_key, + consumer_secret=self.twitter_consumer_secret, + api='', + version='1', + signature_method=self.twitter_signature_method, + ) + + self.twitter_method = "POST" + self.twitter_target_url = ( + "https://api.twitter.com/1/statuses/update.json?" + "include_entities=true" + ) + self.twitter_params_raw = [ + ("status", "Hello Ladies + Gentlemen, a signed OAuth request!"), + ("include_entities", "true"), + ("oauth_consumer_key", self.twitter_consumer_key), + ("oauth_nonce", "kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg"), + ("oauth_signature_method", self.twitter_signature_method), + ("oauth_timestamp", "1318622958"), + ("oauth_token", + "370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb"), + ("oauth_version", "1.0"), + ] + self.twitter_param_string = ( + r"include_entities=true&" + r"oauth_consumer_key=xvz1evFS4wEEPTGEFPHBog&" + r"oauth_nonce=kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg&" + r"oauth_signature_method=HMAC-SHA1&" + r"oauth_timestamp=1318622958&" + r"oauth_token=370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb&" + r"oauth_version=1.0&" + r"status=Hello%20Ladies%20%2B%20Gentlemen%2C%20a%20" + r"signed%20OAuth%20request%21" + ) + self.twitter_signature_base_string = ( + r"POST&" + r"https%3A%2F%2Fapi.twitter.com%2F1%2Fstatuses%2Fupdate.json&" + r"include_entities%3Dtrue%26" + r"oauth_consumer_key%3Dxvz1evFS4wEEPTGEFPHBog%26" + r"oauth_nonce%3DkYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg%26" + r"oauth_signature_method%3DHMAC-SHA1%26" + r"oauth_timestamp%3D1318622958%26" + r"oauth_token%3D370773112-" + r"GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb%26" + r"oauth_version%3D1.0%26" + r"status%3DHello%2520Ladies%2520%252B%2520Gentlemen%252C%2520" + r"a%2520signed%2520OAuth%2520request%2521" + ) + self.twitter_token_secret = 'LswwdoUaIvS8ltyTt5jkRh4J50vUPVVHtR2YPi5kE' + self.twitter_signing_key = ( + 'kAcSOqF21Fu85e7zjz7ZN2U4ZRhfV3WpwPAoE3Z7kBw&' + 'LswwdoUaIvS8ltyTt5jkRh4J50vUPVVHtR2YPi5kE' + ) + self.twitter_oauth_signature = b'tnnArxj06cWHq44gCs1OSKk/jLY=' + + self.lexev_consumer_key = 'your_app_key' + self.lexev_consumer_secret = 'your_app_secret' + self.lexev_callback = 'http://127.0.0.1/oauth1_callback' + self.lexev_signature_method = 'HMAC-SHA1' + self.lexev_version = '1.0' + self.lexev_api = API( + url='https://bitbucket.org/', + api='api', + version='1.0', + consumer_key=self.lexev_consumer_key, + consumer_secret=self.lexev_consumer_secret, + signature_method=self.lexev_signature_method, + callback=self.lexev_callback, + wp_user='', + wp_pass='', + oauth1a_3leg=True + ) + self.lexev_request_method = 'POST' + self.lexev_request_url = \ + 'https://bitbucket.org/api/1.0/oauth/request_token' + self.lexev_request_nonce = '27718007815082439851427366369' + self.lexev_request_timestamp = '1427366369' + self.lexev_request_params = [ + ('oauth_callback', self.lexev_callback), + ('oauth_consumer_key', self.lexev_consumer_key), + ('oauth_nonce', self.lexev_request_nonce), + ('oauth_signature_method', self.lexev_signature_method), + ('oauth_timestamp', self.lexev_request_timestamp), + ('oauth_version', self.lexev_version), + ] + self.lexev_request_signature = b"iPdHNIu4NGOjuXZ+YCdPWaRwvJY=" + self.lexev_resource_url = ( + 'https://api.bitbucket.org/1.0/repositories/st4lk/' + 'django-articles-transmeta/branches' + ) + + def test_get_sign_key(self): + self.assertEqual( + StrUtils.to_binary( + self.wcapi.auth.get_sign_key(self.consumer_secret)), + StrUtils.to_binary("%s&" % self.consumer_secret) + ) + + self.assertEqual( + StrUtils.to_binary(self.wcapi.auth.get_sign_key( + self.twitter_consumer_secret, self.twitter_token_secret)), + StrUtils.to_binary(self.twitter_signing_key) + ) + + def test_flatten_params(self): + self.assertEqual( + StrUtils.to_binary(UrlUtils.flatten_params( + self.twitter_params_raw)), + StrUtils.to_binary(self.twitter_param_string) + ) + + def test_sorted_params(self): + # Example given in oauth.net: + oauthnet_example_sorted = [ + ('a', '1'), + ('c', 'hi%%20there'), + ('f', '25'), + ('f', '50'), + ('f', 'a'), + ('z', 'p'), + ('z', 't') + ] + + oauthnet_example = copy(oauthnet_example_sorted) + random.shuffle(oauthnet_example) + + self.assertEqual( + UrlUtils.sorted_params(oauthnet_example), + oauthnet_example_sorted + ) + + def test_get_signature_base_string(self): + twitter_param_string = OAuth.get_signature_base_string( + self.twitter_method, + self.twitter_params_raw, + self.twitter_target_url + ) + self.assertEqual( + twitter_param_string, + self.twitter_signature_base_string + ) + + def test_generate_oauth_signature(self): + + rfc1_request_signature = self.rfc1_api.auth.generate_oauth_signature( + self.rfc1_request_method, + self.rfc1_request_params, + self.rfc1_request_target_url, + '%s&' % self.rfc1_consumer_secret + ) + self.assertEqual( + text_type(rfc1_request_signature), + text_type(self.rfc1_request_signature) + ) + + # TEST WITH RFC EXAMPLE 3 DATA + + # TEST WITH TWITTER DATA + + twitter_signature = self.twitter_api.auth.generate_oauth_signature( + self.twitter_method, + self.twitter_params_raw, + self.twitter_target_url, + self.twitter_signing_key + ) + self.assertEqual(twitter_signature, self.twitter_oauth_signature) + + # TEST WITH LEXEV DATA + + lexev_request_signature = self.lexev_api.auth.generate_oauth_signature( + method=self.lexev_request_method, + params=self.lexev_request_params, + url=self.lexev_request_url + ) + self.assertEqual(lexev_request_signature, self.lexev_request_signature) + + def test_add_params_sign(self): + endpoint_url = self.wcapi.requester.endpoint_url('products?page=2') + + params = OrderedDict() + params["oauth_consumer_key"] = self.consumer_key + params["oauth_timestamp"] = "1477041328" + params["oauth_nonce"] = "166182658461433445531477041328" + params["oauth_signature_method"] = self.signature_method + params["oauth_version"] = "1.0" + params["oauth_callback"] = 'localhost:8888/wordpress' + + signed_url = self.wcapi.auth.add_params_sign( + "GET", endpoint_url, params) + + signed_url_params = parse_qsl(urlparse(signed_url).query) + # self.assertEqual('page', signed_url_params[-1][0]) + self.assertIn('page', dict(signed_url_params)) + + +class OAuth3LegTestcases(unittest.TestCase): + def setUp(self): + self.consumer_key = "ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.consumer_secret = "cs_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + self.api = API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + oauth1a_3leg=True, + wp_user='test_user', + wp_pass='test_pass', + callback='http://127.0.0.1/oauth1_callback' + ) + + @urlmatch(path=r'.*wp-json.*') + def woo_api_mock(*args, **kwargs): + """ URL Mock """ + return { + 'status_code': 200, + 'content': b""" + { + "name": "Wordpress", + "description": "Just another WordPress site", + "url": "http://localhost:8888/wordpress", + "home": "http://localhost:8888/wordpress", + "namespaces": [ + "wp/v2", + "oembed/1.0", + "wc/v1" + ], + "authentication": { + "oauth1": { + "request": + "http://localhost:8888/wordpress/oauth1/request", + "authorize": + "http://localhost:8888/wordpress/oauth1/authorize", + "access": + "http://localhost:8888/wordpress/oauth1/access", + "version": "0.1" + } + } + } + """ + } + + @urlmatch(path=r'.*oauth.*') + def woo_authentication_mock(*args, **kwargs): + """ URL Mock """ + return { + 'status_code': 200, + 'content': + b"""oauth_token=XXXXXXXXXXXX&oauth_token_secret=YYYYYYYYYYYY""" + } + + def test_get_sign_key(self): + oauth_token_secret = "PNW9j1yBki3e7M7EqB5qZxbe9n5tR6bIIefSMQ9M2pdyRI9g" + + key = self.api.auth.get_sign_key( + self.consumer_secret, oauth_token_secret) + self.assertEqual( + StrUtils.to_binary(key), + StrUtils.to_binary("%s&%s" % + (self.consumer_secret, oauth_token_secret)) + ) + + def test_auth_discovery(self): + + with HTTMock(self.woo_api_mock): + # call requests + authentication = self.api.auth.authentication + self.assertEquals( + authentication, + { + "oauth1": { + "request": + "http://localhost:8888/wordpress/oauth1/request", + "authorize": + "http://localhost:8888/wordpress/oauth1/authorize", + "access": + "http://localhost:8888/wordpress/oauth1/access", + "version": "0.1" + } + } + ) + + def test_get_request_token(self): + + with HTTMock(self.woo_api_mock): + authentication = self.api.auth.authentication + self.assertTrue(authentication) + + with HTTMock(self.woo_authentication_mock): + request_token, request_token_secret = \ + self.api.auth.get_request_token() + self.assertEquals(request_token, 'XXXXXXXXXXXX') + self.assertEquals(request_token_secret, 'YYYYYYYYYYYY') + + def test_store_access_creds(self): + _, creds_store_path = mkstemp( + "wp-api-python-test-store-access-creds.json") + api = API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + oauth1a_3leg=True, + wp_user='test_user', + wp_pass='test_pass', + callback='http://127.0.0.1/oauth1_callback', + access_token='XXXXXXXXXXXX', + access_token_secret='YYYYYYYYYYYY', + creds_store=creds_store_path + ) + api.auth.store_access_creds() + + with open(creds_store_path) as creds_store_file: + self.assertEqual( + creds_store_file.read(), + ('{"access_token": "XXXXXXXXXXXX", ' + '"access_token_secret": "YYYYYYYYYYYY"}') + ) + + def test_retrieve_access_creds(self): + _, creds_store_path = mkstemp( + "wp-api-python-test-store-access-creds.json") + with open(creds_store_path, 'w+') as creds_store_file: + creds_store_file.write( + ('{"access_token": "XXXXXXXXXXXX", ' + '"access_token_secret": "YYYYYYYYYYYY"}')) + + api = API( + url="http://woo.test", + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + oauth1a_3leg=True, + wp_user='test_user', + wp_pass='test_pass', + callback='http://127.0.0.1/oauth1_callback', + creds_store=creds_store_path + ) + + api.auth.retrieve_access_creds() + + self.assertEqual( + api.auth.access_token, + 'XXXXXXXXXXXX' + ) + + self.assertEqual( + api.auth.access_token_secret, + 'YYYYYYYYYYYY' + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..da07de8 --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,188 @@ +""" API Tests """ +from __future__ import unicode_literals + +import unittest + +from six import text_type +from wordpress.helpers import SeqUtils, StrUtils, UrlUtils + + +class HelperTestcase(unittest.TestCase): + def setUp(self): + self.test_url = ( + "http://ich.local:8888/woocommerce/wc-api/v3/products?" + "filter%5Blimit%5D=2&" + "oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&" + "oauth_nonce=c4f2920b0213c43f2e8d3d3333168ec4a22222d1&" + "oauth_signature=3ibOjMuhj6JGnI43BQZGniigHh8%3D&" + "oauth_signature_method=HMAC-SHA1&" + "oauth_timestamp=1481601370&page=2" + ) + + def test_url_is_ssl(self): + self.assertTrue(UrlUtils.is_ssl("https://woo.test:8888")) + self.assertFalse(UrlUtils.is_ssl("http://woo.test:8888")) + + def test_url_substitute_query(self): + self.assertEqual( + UrlUtils.substitute_query( + "https://woo.test:8888/sdf?param=value", "newparam=newvalue"), + "https://woo.test:8888/sdf?newparam=newvalue" + ) + self.assertEqual( + UrlUtils.substitute_query("https://woo.test:8888/sdf?param=value"), + "https://woo.test:8888/sdf" + ) + self.assertEqual( + UrlUtils.substitute_query( + "https://woo.test:8888/sdf?param=value", + "newparam=newvalue&othernewparam=othernewvalue" + ), + ( + "https://woo.test:8888/sdf?newparam=newvalue&" + "othernewparam=othernewvalue" + ) + ) + self.assertEqual( + UrlUtils.substitute_query( + "https://woo.test:8888/sdf?param=value", + "newparam=newvalue&othernewparam=othernewvalue" + ), + ( + "https://woo.test:8888/sdf?newparam=newvalue&" + "othernewparam=othernewvalue" + ) + ) + + def test_url_add_query(self): + self.assertEqual( + "https://woo.test:8888/sdf?param=value&newparam=newvalue", + UrlUtils.add_query( + "https://woo.test:8888/sdf?param=value", 'newparam', 'newvalue' + ) + ) + + def test_url_join_components(self): + self.assertEqual( + 'https://woo.test:8888/wp-json', + UrlUtils.join_components(['https://woo.test:8888/', '', 'wp-json']) + ) + self.assertEqual( + 'https://woo.test:8888/wp-json/wp/v2', + UrlUtils.join_components( + ['https://woo.test:8888/', 'wp-json', 'wp/v2']) + ) + + def test_url_get_php_value(self): + self.assertEqual( + '1', + UrlUtils.get_value_like_as_php(True) + ) + self.assertEqual( + '', + UrlUtils.get_value_like_as_php(False) + ) + self.assertEqual( + 'asd', + UrlUtils.get_value_like_as_php('asd') + ) + self.assertEqual( + '1', + UrlUtils.get_value_like_as_php(1) + ) + self.assertEqual( + '1', + UrlUtils.get_value_like_as_php(1.0) + ) + self.assertEqual( + '1.1', + UrlUtils.get_value_like_as_php(1.1) + ) + + def test_url_get_query_dict_singular(self): + result = UrlUtils.get_query_dict_singular(self.test_url) + self.assertEquals( + result, + { + 'filter[limit]': '2', + 'oauth_nonce': 'c4f2920b0213c43f2e8d3d3333168ec4a22222d1', + 'oauth_timestamp': '1481601370', + 'oauth_consumer_key': + 'ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', + 'oauth_signature_method': 'HMAC-SHA1', + 'oauth_signature': '3ibOjMuhj6JGnI43BQZGniigHh8=', + 'page': '2' + } + ) + + def test_url_get_query_singular(self): + result = UrlUtils.get_query_singular( + self.test_url, 'oauth_consumer_key') + self.assertEqual( + result, + 'ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' + ) + result = UrlUtils.get_query_singular(self.test_url, 'filter[limit]') + self.assertEqual( + text_type(result), + text_type(2) + ) + + def test_url_set_query_singular(self): + result = UrlUtils.set_query_singular(self.test_url, 'filter[limit]', 3) + expected = ( + "http://ich.local:8888/woocommerce/wc-api/v3/products?" + "filter%5Blimit%5D=3&" + "oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&" + "oauth_nonce=c4f2920b0213c43f2e8d3d3333168ec4a22222d1&" + "oauth_signature=3ibOjMuhj6JGnI43BQZGniigHh8%3D&" + "oauth_signature_method=HMAC-SHA1&oauth_timestamp=1481601370&" + "page=2" + ) + self.assertEqual(result, expected) + + def test_url_del_query_singular(self): + result = UrlUtils.del_query_singular(self.test_url, 'filter[limit]') + expected = ( + "http://ich.local:8888/woocommerce/wc-api/v3/products?" + "oauth_consumer_key=ck_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX&" + "oauth_nonce=c4f2920b0213c43f2e8d3d3333168ec4a22222d1&" + "oauth_signature=3ibOjMuhj6JGnI43BQZGniigHh8%3D&" + "oauth_signature_method=HMAC-SHA1&" + "oauth_timestamp=1481601370&" + "page=2" + ) + self.assertEqual(result, expected) + + def test_url_remove_default_port(self): + self.assertEqual( + UrlUtils.remove_default_port('http://www.gooogle.com:80/'), + 'http://www.gooogle.com/' + ) + self.assertEqual( + UrlUtils.remove_default_port('http://www.gooogle.com:18080/'), + 'http://www.gooogle.com:18080/' + ) + + def test_seq_filter_true(self): + self.assertEquals( + ['a', 'b', 'c', 'd'], + SeqUtils.filter_true([None, 'a', False, 'b', 'c', 'd']) + ) + + def test_str_remove_tail(self): + self.assertEqual( + 'sdf', + StrUtils.remove_tail('sdf/', '/') + ) + + def test_str_remove_head(self): + self.assertEqual( + 'sdf', + StrUtils.remove_head('/sdf', '/') + ) + + self.assertEqual( + 'sdf', + StrUtils.decapitate('sdf', '/') + ) diff --git a/tests/test_transport.py b/tests/test_transport.py new file mode 100644 index 0000000..a221d3c --- /dev/null +++ b/tests/test_transport.py @@ -0,0 +1,43 @@ +""" API Tests """ +from __future__ import unicode_literals + +import unittest + +from httmock import HTTMock, all_requests +from wordpress.transport import API_Requests_Wrapper + + +class TransportTestcases(unittest.TestCase): + def setUp(self): + self.requester = API_Requests_Wrapper( + url='https://woo.test:8888/', + api='wp-json', + api_version='wp/v2' + ) + + def test_api_url(self): + self.assertEqual( + 'https://woo.test:8888/wp-json', + self.requester.api_url + ) + + def test_endpoint_url(self): + self.assertEqual( + 'https://woo.test:8888/wp-json/wp/v2/posts', + self.requester.endpoint_url('posts') + ) + + def test_request(self): + @all_requests + def woo_test_mock(*args, **kwargs): + """ URL Mock """ + return {'status_code': 200, + 'content': b'OK'} + + with HTTMock(woo_test_mock): + # call requests + response = self.requester.request( + "GET", "https://woo.test:8888/wp-json/wp/v2/posts") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.request.url, + 'https://woo.test:8888/wp-json/wp/v2/posts') diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..555779a --- /dev/null +++ b/tox.ini @@ -0,0 +1,14 @@ +# Tox (https://tox.readthedocs.io/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. To use it, "pip install tox" +# and then run "tox" from this directory. + +[tox] +envlist = py27, py36 + +[testenv] +deps= + -rrequirements.txt + -rrequirements-test.txt +commands= + pytest diff --git a/woocommerce/__init__.py b/woocommerce/__init__.py deleted file mode 100644 index 7c4b16d..0000000 --- a/woocommerce/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -woocommerce -~~~~~~~~~~~~~~~ -A Python wrapper for WooCommerce API. - -:copyright: (c) 2015 by WooThemes. -:license: MIT, see LICENSE for details. -""" - -__title__ = "woocommerce" -__version__ = "1.2.1" -__author__ = "Claudio Sanches @ WooThemes" -__license__ = "MIT" - -from woocommerce.api import API diff --git a/woocommerce/api.py b/woocommerce/api.py deleted file mode 100644 index d15a179..0000000 --- a/woocommerce/api.py +++ /dev/null @@ -1,113 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -WooCommerce API Class -""" - -__title__ = "woocommerce-api" -__version__ = "1.2.1" -__author__ = "Claudio Sanches @ WooThemes" -__license__ = "MIT" - -from requests import request -from json import dumps as jsonencode -from woocommerce.oauth import OAuth - - -class API(object): - """ API Class """ - - def __init__(self, url, consumer_key, consumer_secret, **kwargs): - self.url = url - self.consumer_key = consumer_key - self.consumer_secret = consumer_secret - self.wp_api = kwargs.get("wp_api", False) - self.version = kwargs.get("version", "v3") - self.is_ssl = self.__is_ssl() - self.timeout = kwargs.get("timeout", 5) - self.verify_ssl = kwargs.get("verify_ssl", True) - self.query_string_auth = kwargs.get("query_string_auth", False) - - def __is_ssl(self): - """ Check if url use HTTPS """ - return self.url.startswith("https") - - def __get_url(self, endpoint): - """ Get URL for requests """ - url = self.url - api = "wc-api" - - if url.endswith("/") is False: - url = "%s/" % url - - if self.wp_api: - api = "wp-json" - - return "%s%s/%s/%s" % (url, api, self.version, endpoint) - - def __get_oauth_url(self, url, method): - """ Generate oAuth1.0a URL """ - oauth = OAuth( - url=url, - consumer_key=self.consumer_key, - consumer_secret=self.consumer_secret, - version=self.version, - method=method - ) - - return oauth.get_oauth_url() - - def __request(self, method, endpoint, data): - """ Do requests """ - url = self.__get_url(endpoint) - auth = None - params = {} - headers = { - "user-agent": "WooCommerce API Client-Python/%s" % __version__, - "accept": "application/json" - } - - if self.is_ssl is True and self.query_string_auth is False: - auth = (self.consumer_key, self.consumer_secret) - elif self.is_ssl is True and self.query_string_auth is True: - params = { - "consumer_key": self.consumer_key, - "consumer_secret": self.consumer_secret - } - else: - url = self.__get_oauth_url(url, method) - - if data is not None: - data = jsonencode(data, ensure_ascii=False).encode('utf-8') - headers["content-type"] = "application/json;charset=utf-8" - - return request( - method=method, - url=url, - verify=self.verify_ssl, - auth=auth, - params=params, - data=data, - timeout=self.timeout, - headers=headers - ) - - def get(self, endpoint): - """ Get requests """ - return self.__request("GET", endpoint, None) - - def post(self, endpoint, data): - """ POST requests """ - return self.__request("POST", endpoint, data) - - def put(self, endpoint, data): - """ PUT requests """ - return self.__request("PUT", endpoint, data) - - def delete(self, endpoint): - """ DELETE requests """ - return self.__request("DELETE", endpoint, None) - - def options(self, endpoint): - """ OPTIONS requests """ - return self.__request("OPTIONS", endpoint, None) diff --git a/woocommerce/oauth.py b/woocommerce/oauth.py deleted file mode 100644 index 2dd4696..0000000 --- a/woocommerce/oauth.py +++ /dev/null @@ -1,139 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -WooCommerce OAuth1.0a Class -""" - -__title__ = "woocommerce-oauth" -__version__ = "1.2.1" -__author__ = "Claudio Sanches @ WooThemes" -__license__ = "MIT" - -from time import time -from random import randint -from hmac import new as HMAC -from hashlib import sha1, sha256 -from base64 import b64encode - -try: - from urllib.parse import urlencode, quote, unquote, parse_qsl, urlparse -except ImportError: - from urllib import urlencode, quote, unquote - from urlparse import parse_qsl, urlparse - -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict - - -class OAuth(object): - """ API Class """ - - def __init__(self, url, consumer_key, consumer_secret, **kwargs): - self.url = url - self.consumer_key = consumer_key - self.consumer_secret = consumer_secret - self.version = kwargs.get("version", "v3") - self.method = kwargs.get("method", "GET") - - def get_oauth_url(self): - """ Returns the URL with OAuth params """ - params = OrderedDict() - - if "?" in self.url: - url = self.url[:self.url.find("?")] - for key, value in parse_qsl(urlparse(self.url).query): - params[key] = value - else: - url = self.url - - params["oauth_consumer_key"] = self.consumer_key - params["oauth_timestamp"] = int(time()) - params["oauth_nonce"] = self.generate_nonce() - params["oauth_signature_method"] = "HMAC-SHA256" - params["oauth_signature"] = self.generate_oauth_signature(params, url) - - query_string = urlencode(params) - - return "%s?%s" % (url, query_string) - - def generate_oauth_signature(self, params, url): - """ Generate OAuth Signature """ - if "oauth_signature" in params.keys(): - del params["oauth_signature"] - - base_request_uri = quote(url, "") - params = self.sorted_params(params) - params = self.normalize_parameters(params) - query_params = ["{param_key}%3D{param_value}".format(param_key=key, param_value=value) - for key, value in params.items()] - - query_string = "%26".join(query_params) - string_to_sign = "%s&%s&%s" % (self.method, base_request_uri, query_string) - - consumer_secret = str(self.consumer_secret) - if self.version not in ["v1", "v2"]: - consumer_secret += "&" - - hash_signature = HMAC( - consumer_secret.encode(), - str(string_to_sign).encode(), - sha256 - ).digest() - - return b64encode(hash_signature).decode("utf-8").replace("\n", "") - - @staticmethod - def sorted_params(params): - ordered = OrderedDict() - base_keys = sorted(set(k.split('[')[0] for k in params.keys())) - - for base in base_keys: - for key in params.keys(): - if key == base or key.startswith(base + '['): - ordered[key] = params[key] - - return ordered - - @staticmethod - def normalize_parameters(params): - """ Normalize parameters """ - params = params or {} - normalized_parameters = OrderedDict() - - def get_value_like_as_php(val): - """ Prepare value for quote """ - try: - base = basestring - except NameError: - base = (str, bytes) - - if isinstance(val, base): - return val - elif isinstance(val, bool): - return "1" if val else "" - elif isinstance(val, int): - return str(val) - elif isinstance(val, float): - return str(int(val)) if val % 1 == 0 else str(val) - else: - return "" - - for key, value in params.items(): - value = get_value_like_as_php(value) - key = quote(unquote(str(key))).replace("%", "%25") - value = quote(unquote(str(value))).replace("%", "%25") - normalized_parameters[key] = value - - return normalized_parameters - - @staticmethod - def generate_nonce(): - """ Generate nonce number """ - nonce = ''.join([str(randint(0, 9)) for i in range(8)]) - return HMAC( - nonce.encode(), - "secret".encode(), - sha1 - ).hexdigest() diff --git a/wordpress/__init__.py b/wordpress/__init__.py new file mode 100644 index 0000000..ab933ec --- /dev/null +++ b/wordpress/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- + +""" +wordpress +~~~~~~~~~ +A Python wrapper for Wordpress and WooCommerce REST APIs. + +:copyright: (c) 2015 by WooThemes. +:license: MIT, see LICENSE for details. +""" + +__title__ = "wordpress" +__version__ = "1.2.9" +__author__ = "Claudio Sanches @ WooThemes, forked by Derwent" +__license__ = "MIT" + +__default_api_version__ = "wp/v2" +__default_api__ = "wp-json" + +from wordpress.api import API diff --git a/wordpress/api.py b/wordpress/api.py new file mode 100644 index 0000000..79a1114 --- /dev/null +++ b/wordpress/api.py @@ -0,0 +1,257 @@ +# -*- coding: utf-8 -*- + +""" +Wordpress API Class +""" + +from __future__ import unicode_literals + +# from requests import request +import logging + +from six import text_type +from wordpress.auth import BasicAuth, NoAuth, OAuth, OAuth_3Leg +from wordpress.helpers import StrUtils, UrlUtils +from wordpress.transport import API_Requests_Wrapper + +__title__ = "wordpress-api" + + +class API(object): + """ API Class """ + + def __init__(self, url, consumer_key, consumer_secret, **kwargs): + self.logger = logging.getLogger(__name__) + self.requester = API_Requests_Wrapper(url=url, **kwargs) + + auth_kwargs = dict( + requester=self.requester, + consumer_key=consumer_key, + consumer_secret=consumer_secret, + ) + auth_kwargs.update(kwargs) + + auth_class = OAuth + if kwargs.get('basic_auth'): + auth_class = BasicAuth + elif kwargs.get('oauth1a_3leg'): + auth_class = OAuth_3Leg + elif kwargs.get('no_auth'): + auth_class = NoAuth + + if ( + kwargs.get('version', '').startswith('wc') + and kwargs.get('oauth1a_3leg') + ): + self.logger.warn( + "WooCommerce JSON Api does not seem to support 3leg") + + self.auth = auth_class(**auth_kwargs) + + @property + def url(self): + return self.requester.url + + @property + def timeout(self): + return self.requester.timeout + + @property + def namespace(self): + return self.requester.api + + @property + def version(self): + return self.requester.api_version + + @property + def verify_ssl(self): + return self.requester.verify_ssl + + @property + def is_ssl(self): + return self.requester.is_ssl + + @property + def consumer_key(self): + return self.auth.consumer_key + + @property + def consumer_secret(self): + return self.auth.consumer_secret + + @property + def callback(self): + return self.auth.callback + + def request_post_mortem(self, response=None): + """ + Attempt to diagnose what went wrong in a request + """ + + reason = None + remedy = None + + response_json = {} + try: + response_json = response.json() + except ValueError: + pass + + # import pudb; pudb.set_trace() + + request_body = {} + request_url = "" + if hasattr(response, 'request'): + if hasattr(response.request, 'url'): + request_url = response.request.url + if hasattr(response.request, 'body'): + request_body = response.request.body + + try_hostname_mismatch = False + + if ( + isinstance(response_json, dict) + and ('code' in response_json or 'message' in response_json) + ): + reason = " - ".join([ + text_type(response_json.get(key)) + for key in ['code', 'message', 'data'] + if key in response_json + ]) + code = text_type(response_json.get('code')) + + if code == 'rest_user_invalid_email': + remedy = "Try checking the email %s doesn't already exist" % \ + request_body.get('email') + + elif code == 'json_oauth1_consumer_mismatch': + remedy = "Try deleting the cached credentials at %s" % \ + self.auth.creds_store + + elif code == 'woocommerce_rest_cannot_view': + if not self.auth.query_string_auth: + remedy = "Try enabling query_string_auth" + else: + remedy = ( + "This error is super generic and can be caused by " + "just about anything. Here are some things to try: \n" + " - Check that the account which as assigned to your " + "oAuth creds has the correct access level\n" + " - Enable logging and check for error messages in " + "wp-content and wp-content/uploads/wc-logs\n" + " - Check that your query string parameters are " + "valid\n" + " - Make sure your server is not messing with " + "authentication headers\n" + " - Try a different endpoint\n" + " - Try enabling HTTPS and using basic " + "authentication\n" + ) + + elif code == 'woocommerce_rest_authentication_error': + try_hostname_mismatch = True + + response_headers = {} + if hasattr(response, 'headers'): + response_headers = response.headers + + if not reason or try_hostname_mismatch: + requester_api_url = self.requester.api_url + links = [] + if hasattr(response, 'links') and response.links: + links = response.links + elif 'Link' in response_headers: + links = [response_headers['Link']] + if links: + first_link_key = list(links)[0] + header_api_url = links[first_link_key].get('url', '') + if header_api_url: + header_api_url = StrUtils.eviscerate(header_api_url, '/') + + if ( + header_api_url and requester_api_url + and StrUtils.to_text(header_api_url) + != StrUtils.to_text(requester_api_url) + ): + reason = "hostname mismatch. %s != %s" % tuple(map( + StrUtils.to_text, [ + header_api_url, requester_api_url + ] + )) + header_url = StrUtils.eviscerate(header_api_url, '/') + header_url = StrUtils.eviscerate( + header_url, self.requester.api) + header_url = StrUtils.eviscerate(header_url, '/') + remedy = "try changing url to %s" % header_url + + msg = ( + "API call to %s returned \nCODE: " + "%s\nRESPONSE:%s \nHEADERS: %s\nREQ_BODY:%s" + ) % tuple(map(StrUtils.to_text, [ + request_url, + response.status_code, + UrlUtils.beautify_response(response), + response_headers, + StrUtils.to_binary(request_body)[:1000] + ])) + if reason: + msg += "\nBecause of %s" % StrUtils.to_binary(reason) + if remedy: + msg += "\n%s" % remedy + raise UserWarning(msg) + + def __request(self, method, endpoint, data, **kwargs): + """ Do requests """ + + endpoint_url = self.requester.endpoint_url(endpoint) + endpoint_url = self.auth.get_auth_url(endpoint_url, method, **kwargs) + auth = self.auth.get_auth() + + content_type = 'application/json' + for key, value in kwargs.get('headers', {}).items(): + if key.lower() == 'content-type': + content_type = value.lower() + + if data is not None and content_type.startswith('application/json'): + data = StrUtils.jsonencode(data, ensure_ascii=False) + + # enforce utf-8 encoded binary + data = StrUtils.to_binary(data) + + handle_status_codes = kwargs.pop('handle_status_codes', []) + + response = self.requester.request( + method=method, + url=endpoint_url, + auth=auth, + data=data, + **kwargs + ) + + if response.status_code not in [200, 201, 202] + handle_status_codes: + self.request_post_mortem(response) + + return response + + # TODO add kwargs option for headers + + def get(self, endpoint, **kwargs): + """ Get requests """ + return self.__request("GET", endpoint, None, **kwargs) + + def post(self, endpoint, data, **kwargs): + """ POST requests """ + return self.__request("POST", endpoint, data, **kwargs) + + def put(self, endpoint, data, **kwargs): + """ PUT requests """ + return self.__request("PUT", endpoint, data, **kwargs) + + def delete(self, endpoint, **kwargs): + """ DELETE requests """ + return self.__request("DELETE", endpoint, None, **kwargs) + + def options(self, endpoint, **kwargs): + """ OPTIONS requests """ + return self.__request("OPTIONS", endpoint, None, **kwargs) diff --git a/wordpress/auth.py b/wordpress/auth.py new file mode 100644 index 0000000..819c957 --- /dev/null +++ b/wordpress/auth.py @@ -0,0 +1,703 @@ +# -*- coding: utf-8 -*- + +""" +Wordpress OAuth1.0a Class +""" + +__title__ = "wordpress-auth" + +import binascii +import json +import logging +import os +from collections import OrderedDict +from hashlib import sha1, sha256 +from hmac import new as HMAC +from pprint import pformat +from random import randint +from time import time + +import requests +from requests.auth import HTTPBasicAuth + +from bs4 import BeautifulSoup +from six.moves.urllib.parse import parse_qs, parse_qsl, quote, urlparse +from wordpress import __version__ + +from .helpers import StrUtils, UrlUtils + + +class Auth(object): + """ Boilerplate for handling authentication stuff. """ + + def __init__(self, requester, **kwargs): + self.requester = requester + self.logger = logging.getLogger(__name__) + self.query_string_auth = kwargs.pop('query_string_auth', True) + + @property + def api_version(self): + return self.requester.api_version + + @property + def api_namespace(self): + return self.requester.api + + def get_auth_url(self, endpoint_url, method): + """ Returns the URL with added Auth params """ + return endpoint_url + + def get_auth(self): + """ Returns the auth parameter used in requests """ + pass + + +class BasicAuth(Auth): + """ Does not perform any signing, just logs in with oauth creds """ + + def __init__(self, requester, consumer_key, consumer_secret, **kwargs): + super(BasicAuth, self).__init__(requester, **kwargs) + self.consumer_key = consumer_key + self.consumer_secret = consumer_secret + self.user_auth = kwargs.pop('user_auth', None) + self.wp_user = kwargs.pop('wp_user', None) + self.wp_pass = kwargs.pop('wp_pass', None) + + def get_auth_url(self, endpoint_url, method, **kwargs): + if self.query_string_auth: + endpoint_params = UrlUtils.get_query_dict_singular(endpoint_url) + endpoint_params.update({ + "consumer_key": self.consumer_key, + "consumer_secret": self.consumer_secret + }) + endpoint_url = UrlUtils.substitute_query( + endpoint_url, + UrlUtils.flatten_params(endpoint_params) + ) + return endpoint_url + + def get_auth(self): + if self.user_auth: + return HTTPBasicAuth(self.wp_user, self.wp_pass) + if not self.query_string_auth: + return HTTPBasicAuth(self.consumer_key, self.consumer_secret) + + +class NoAuth(Auth): + """ + Just a dummy Auth object to allow header based + authorization per request + """ + + def get_auth_url(self, endpoint_url, method, **kwargs): + return endpoint_url + + +class OAuth(Auth): + """ Signs string with oauth consumer_key and consumer_secret """ + oauth_version = '1.0' + force_nonce = None + force_timestamp = None + + """ API Class """ + + def __init__(self, requester, consumer_key, consumer_secret, **kwargs): + super(OAuth, self).__init__(requester, **kwargs) + if not self.query_string_auth: + raise UserWarning("Header Auth not supported for OAuth") + self.consumer_key = consumer_key + self.consumer_secret = consumer_secret + self.signature_method = kwargs.pop('signature_method', 'HMAC-SHA1') + self.force_timestamp = kwargs.pop('force_timestamp', None) + self.force_nonce = kwargs.pop('force_nonce', None) + + def get_sign_key(self, consumer_secret, token_secret=None): + """Get consumer_secret, convert to bytestring suitable for signing.""" + if not consumer_secret: + raise UserWarning("no consumer_secret provided") + token_secret = str(token_secret) if token_secret else '' + if self.api_namespace == 'wc-api' \ + and self.api_version in ["v1", "v2"]: + # special conditions for wc-api v1-2 + key = consumer_secret + else: + key = StrUtils.to_binary("%s&%s" % (consumer_secret, token_secret)) + return key + + def add_params_sign(self, method, url, params, sign_key=None, **kwargs): + """ + Add the params to a given url. + + Sign the url with sign_key if provided, otherwise generate + sign_key automatically and return a signed url. + """ + if isinstance(params, dict): + params = list(params.items()) + + urlparse_result = urlparse(url) + + if urlparse_result.query: + params += parse_qsl(urlparse_result.query) + # for key, value in parse_qsl(urlparse_result.query): + # params += [(key, value)] + + # headers = kwargs.get('headers', {}) + # if headers: + # params += headers.items() + + params = UrlUtils.unique_params(params) + params = UrlUtils.sorted_params(params) + + params_without_signature = [] + for key, value in params: + if key != "oauth_signature": + params_without_signature.append((key, value)) + + self.logger.debug('sorted_params before sign: %s' % + pformat(params_without_signature)) + + signature = self.generate_oauth_signature( + method, params_without_signature, url, sign_key) + + self.logger.debug('signature: %s' % signature) + + params = params_without_signature + [("oauth_signature", signature)] + + query_string = UrlUtils.flatten_params(params) + + return UrlUtils.substitute_query(url, query_string) + + def get_params(self): + return [ + ("oauth_consumer_key", self.consumer_key), + ("oauth_nonce", self.generate_nonce()), + ("oauth_signature_method", self.signature_method), + ("oauth_timestamp", self.generate_timestamp()), + ] + + def get_auth_url(self, endpoint_url, method, **kwargs): + """ Returns the URL with added Auth params """ + params = self.get_params() + + return self.add_params_sign(method, endpoint_url, params, **kwargs) + + @classmethod + def get_signature_base_string(cls, method, params, url): + # remove default port + url = UrlUtils.remove_default_port(url) + # ensure scheme is lowercase + url = UrlUtils.lower_scheme(url) + # remove query string parameters + url = UrlUtils.substitute_query(url) + base_request_uri = quote(url, "") + query_string = UrlUtils.flatten_params(params) + query_string = quote(query_string, '~') + return "%s&%s&%s" % ( + method.upper(), base_request_uri, query_string + ) + + def generate_oauth_signature(self, method, params, url, key=None): + """ Generate OAuth Signature """ + + string_to_sign = self.get_signature_base_string(method, params, url) + + if key is None: + key = self.get_sign_key(self.consumer_secret) + + if self.signature_method == 'HMAC-SHA1': + hmac_mod = sha1 + elif self.signature_method == 'HMAC-SHA256': + hmac_mod = sha256 + else: + raise UserWarning("Unknown signature_method") + + # print "\nstring_to_sign: %s" % repr(string_to_sign) + # print "\nkey: %s" % repr(key) + sig = HMAC( + StrUtils.to_binary(key), + StrUtils.to_binary(string_to_sign), + hmac_mod + ) + sig_b64 = binascii.b2a_base64(sig.digest())[:-1] + # print "\nsig_b64: %s" % sig_b64 + return sig_b64 + + @classmethod + def generate_timestamp(cls): + """ Generate timestamp """ + if cls.force_timestamp is not None: + return cls.force_timestamp + return int(time()) + + @classmethod + def generate_nonce(cls): + """ Generate nonce number """ + if cls.force_nonce is not None: + return cls.force_nonce + nonce = ''.join([str(randint(0, 9)) for i in range(8)]) + return HMAC( + nonce.encode(), + "secret".encode(), + sha1 + ).hexdigest() + + +class OAuth_3Leg(OAuth): + """ + Provide 3 legged OAuth1a. + + Mostly based off this: http://www.lexev.org/en/2015/oauth-step-step/ + """ + + # oauth_version = '1.0A' + + def __init__( + self, requester, consumer_key, consumer_secret, callback, **kwargs + ): + super(OAuth_3Leg, self).__init__( + requester, consumer_key, consumer_secret, **kwargs) + self.callback = callback + self.wp_user = kwargs.pop('wp_user', None) + self.wp_pass = kwargs.pop('wp_pass', None) + self._creds_store = kwargs.pop('creds_store', None) + self._authentication = None + self._request_token = kwargs.pop('request_token', None) + self.request_token_secret = None + self._oauth_verifier = None + self._access_token = kwargs.pop('access_token', None) + self.access_token_secret = kwargs.pop('access_token_secret', None) + + @property + def authentication(self): + """ + Provide authentication links discovered from the API. + + Automatically generated if accessed before generated. + """ + if not self._authentication: + self._authentication = self.discover_auth() + return self._authentication + + @property + def oauth_verifier(self): + """ + Verifier string used in authentication. + + Automatically generated if accessed before generated. + """ + if not self._oauth_verifier: + self._oauth_verifier = self.get_verifier() + return self._oauth_verifier + + @property + def request_token(self): + """ + OAuth token used in requesting an access_token. + + Automatically generated if accessed before generated. + """ + if not self._request_token: + self.get_request_token() + return self._request_token + + @property + def access_token(self): + """ + OAuth token used to sign requests to protected resources. + + Automatically generated if accessed before generated. + """ + if not self._access_token and self.creds_store: + self.retrieve_access_creds() + if not self._access_token: + self.get_access_token() + return self._access_token + + @property + def creds_store(self): + if self._creds_store: + return os.path.expanduser(self._creds_store) + + def get_auth_url(self, endpoint_url, method, **kwargs): + """ + Return the URL with OAuth params. + """ + assert self.access_token, "need a valid access token for this step" + assert self.access_token_secret, \ + "need a valid access token secret for this step" + + params = self.get_params() + params += [ + ('oauth_callback', self.callback), + ('oauth_token', self.access_token) + ] + + sign_key = self.get_sign_key( + self.consumer_secret, self.access_token_secret) + + self.logger.debug('sign_key: %s' % sign_key) + + return self.add_params_sign(method, endpoint_url, params, sign_key) + + def discover_auth(self): + """ + Discover the location of authentication resourcers from the API. + """ + discovery_url = self.requester.api_url + + response = self.requester.request('GET', discovery_url) + response_json = response.json() + + has_authentication_resources = True + + if 'authentication' in response_json: + authentication = response_json['authentication'] + if not isinstance(authentication, dict): + has_authentication_resources = False + else: + has_authentication_resources = False + + if not has_authentication_resources: + raise UserWarning( + ( + "Response does not include location of authentication " + "resources.\n" + "Resopnse: %s\n%s\n" + "Please check you have configured the Wordpress OAuth1 " + "plugin correctly." + ) % (response, response.text[:500]) + ) + + self._authentication = authentication + + return self._authentication + + def get_request_token(self): + """ + Uses the request authentication link to get an oauth_token for + requesting an access token + """ + + assert self.consumer_key, "need a valid consumer_key for this step" + + params = self.get_params() + params += [ + ('oauth_callback', self.callback) + ] + + request_token_url = self.authentication['oauth1']['request'] + request_token_url = self.add_params_sign( + "GET", request_token_url, params) + + response = self.requester.get(request_token_url) + self.logger.debug('get_request_token response: %s' % response.text) + resp_content = parse_qs(response.text) + + try: + self._request_token = resp_content['oauth_token'][0] + except: + raise UserWarning( + "Could not parse request_token in response from %s : %s" + % ( + repr(response.request.url), + UrlUtils.beautify_response(response)) + ) + try: + self.request_token_secret = resp_content['oauth_token_secret'][0] + except: + raise UserWarning( + "Could not parse request_token_secret in response from %s : %s" + % ( + repr(response.request.url), + UrlUtils.beautify_response(response)) + ) + + return self._request_token, self.request_token_secret + + def parse_login_form_error(self, response, exc, **kwargs): + """ + If unable to parse login form, try to determine which error is present + """ + login_form_soup = BeautifulSoup(response.text, 'lxml') + if response.status_code == 500: + error = login_form_soup.select_one('body#error-page') + if error and error.stripped_strings: + for stripped_string in error.stripped_strings: + if ( + "plase solve this math problem" + in stripped_string.lower() + ): + raise UserWarning( + "Can't log in if form has capcha ... yet") + raise UserWarning( + "could not parse login form error. %s " % str(error)) + if response.status_code == 200: + error = login_form_soup.select_one('div#login_error') + if error and error.stripped_strings: + for stripped_string in error.stripped_strings: + if "invalid token" in stripped_string.lower(): + raise UserWarning("Invalid token: %s" % + repr(kwargs.get('token'))) + elif "invalid username" in stripped_string.lower(): + raise UserWarning("Invalid username: %s" % + repr(kwargs.get('username'))) + elif "the password you entered" in stripped_string.lower(): + raise UserWarning("Invalid password: %s" % + repr(kwargs.get('password'))) + raise UserWarning( + "could not parse login form error. %s " % str(error)) + raise UserWarning( + "Login form response was code %s. original error: \n%s" % + (str(response.status_code), repr(exc)) + ) + + def get_form_info(self, response, form_id): + """ parses a form specified by a given form_id in the response, + extracts form data and form action """ + + assert \ + response.status_code is 200, \ + "login form response should be 200, not %s\n%s" % ( + response.status_code, + response.text + ) + response_soup = BeautifulSoup(response.text, "lxml") + form_soup = response_soup.select_one('form#%s' % form_id) + assert \ + form_soup, "unable to find form with id=%s in %s " \ + % ( + form_id, + (response_soup.prettify()).encode('ascii', + errors='backslashreplace') + ) + # print "login form: \n", form_soup.prettify() + + action = form_soup.get('action') + assert \ + action, "action should be provided by form: %s" \ + % (form_soup.prettify()).encode('ascii', errors='backslashreplace') + + form_data = OrderedDict() + for input_soup in ( + form_soup.select('input') + form_soup.select('button') + ): + name = input_soup.get('name') + if not name: + continue + value = input_soup.get('value') + if name not in form_data: + form_data[name] = [] + form_data[name].append(value) + + # print "form data: %s" % str(form_data) + return action, form_data + + def get_verifier(self, request_token=None, wp_user=None, wp_pass=None): + """ + Get verifier string from access token. + + Pretends to be a browser, uses the authorize auth link, + submits user creds to WP login form. + """ + + if request_token is None: + request_token = self.request_token + assert request_token, "need a valid request_token for this step" + + if wp_user is None and self.wp_user: + wp_user = self.wp_user + if wp_pass is None and self.wp_pass: + wp_pass = self.wp_pass + + authorize_url = self.authentication['oauth1']['authorize'] + authorize_url = UrlUtils.add_query( + authorize_url, 'oauth_token', request_token) + + # we're using a different session from the usual API calls + # (I think the headers are incompatible?) + + # self.requester.get(authorize_url) + authorize_session = requests.Session() + authorize_session.headers.update( + {'User-Agent': "Wordpress API Client-Python/%s" % __version__}) + + login_form_response = authorize_session.get(authorize_url) + login_form_params = { + 'username': wp_user, + 'password': wp_pass, + 'token': request_token + } + try: + login_form_action, login_form_data = self.get_form_info( + login_form_response, 'loginform') + except AssertionError as exc: + self.parse_login_form_error( + login_form_response, exc, **login_form_params + ) + + for name, values in login_form_data.items(): + if name == 'log': + login_form_data[name] = wp_user + elif name == 'pwd': + login_form_data[name] = wp_pass + else: + login_form_data[name] = values[0] + + assert 'log' in login_form_data, \ + 'input for user login did not appear on form' + assert 'pwd' in login_form_data, \ + 'input for user password did not appear on form' + + confirmation_response = authorize_session.post( + login_form_action, data=login_form_data, allow_redirects=True) + try: + authorize_form_action, authorize_form_data = self.get_form_info( + confirmation_response, 'oauth1_authorize_form') + except AssertionError as exc: + self.parse_login_form_error( + confirmation_response, exc, **login_form_params + ) + + for name, values in authorize_form_data.items(): + if name == 'wp-submit': + assert \ + 'authorize' in values, \ + "apparently no authorize button, only %s" % str(values) + authorize_form_data[name] = 'authorize' + else: + authorize_form_data[name] = values[0] + + assert 'wp-submit' in login_form_data, \ + 'authorize button did not appear on form' + + final_response = authorize_session.post( + authorize_form_action, data=authorize_form_data, + allow_redirects=False) + + assert final_response.status_code == 302, \ + ( + "was not redirected by authorize screen, " + "was %d instead. something went wrong" + % final_response.status_code + ) + assert 'location' in final_response.headers, \ + "redirect did not provide redirect location in header" + + final_location = final_response.headers['location'] + + # At this point we can chose to follow the redirect if the user wants, + # or just parse the verifier out of the redirect url. + # open to suggestions if anyone has any :) + + final_location_queries = parse_qs(urlparse(final_location).query) + + assert 'oauth_verifier' in final_location_queries, \ + ( + "oauth verifier not provided in final redirect: %s" + % final_location + ) + + self._oauth_verifier = final_location_queries['oauth_verifier'][0] + return self._oauth_verifier + + def store_access_creds(self): + """ store the access_token and access_token_secret locally. """ + + if not self.creds_store: + return + + creds = OrderedDict() + if self._access_token: + creds['access_token'] = self._access_token + if self.access_token_secret: + creds['access_token_secret'] = self.access_token_secret + if creds: + dirname = os.path.dirname(self.creds_store) + dirname = os.path.expanduser(dirname) + dirname = os.path.expandvars(dirname) + if not os.path.exists(dirname): + os.mkdir(dirname) + with open(self.creds_store, 'w+') as creds_store_file: + StrUtils.to_binary( + json.dump(creds, creds_store_file, ensure_ascii=False)) + + def retrieve_access_creds(self): + """Retrieve access_token / access_token_secret stored locally.""" + + if not self.creds_store: + return + + creds = {} + if os.path.isfile(self.creds_store): + with open(self.creds_store, 'r') as creds_store_file: + try: + creds = json.load(creds_store_file, encoding='utf-8') + except ValueError: + pass + + if 'access_token' in creds: + self._access_token = creds['access_token'] + if 'access_token_secret' in creds: + self.access_token_secret = creds['access_token_secret'] + + def clear_stored_creds(self): + """ Clear the file containing stored creds. """ + + if not self.creds_store: + return + + with open(self.creds_store, 'w+') as creds_store_file: + creds_store_file.write('') + + def get_access_token(self, oauth_verifier=None): + """ Uses the access authentication link to get an access token """ + + if oauth_verifier is None: + oauth_verifier = self.oauth_verifier + assert oauth_verifier, "Need an oauth verifier to perform this step" + assert self.request_token, \ + "Need a valid request_token to perform this step" + + params = self.get_params() + params += [ + ('oauth_token', self.request_token), + ('oauth_verifier', self.oauth_verifier) + ] + + sign_key = self.get_sign_key( + self.consumer_secret, self.request_token_secret) + + access_token_url = self.authentication['oauth1']['access'] + access_token_url = self.add_params_sign( + "POST", access_token_url, params, sign_key) + + access_response = self.requester.post(access_token_url) + + self.logger.debug('access_token response: %s' % access_response.text) + + assert \ + access_response.status_code == 200, \ + "Access request did not return 200, returned %s. HTML: %s" % ( + access_response.status_code, + UrlUtils.beautify_response(access_response) + ) + + # + access_response_queries = parse_qs(access_response.text) + + try: + self._access_token = access_response_queries['oauth_token'][0] + self.access_token_secret = \ + access_response_queries['oauth_token_secret'][0] + except: + raise UserWarning( + "Could not parse access_token or access_token_secret in " + "response from %s : %s" + % ( + repr(access_response.request.url), + UrlUtils.beautify_response(access_response)) + ) + + self.store_access_creds() + + return self._access_token, self.access_token_secret diff --git a/wordpress/helpers.py b/wordpress/helpers.py new file mode 100644 index 0000000..081af1b --- /dev/null +++ b/wordpress/helpers.py @@ -0,0 +1,405 @@ +# -*- coding: utf-8 -*- + +""" +Wordpress Hellper Class +""" + +__title__ = "wordpress-requests" + +import json +import locale +import os +import posixpath +import re +import sys +from collections import OrderedDict + +from bs4 import BeautifulSoup +from six import (PY2, PY3, binary_type, iterbytes, string_types, text_type, + unichr) +from six.moves import reduce +from six.moves.urllib.parse import ParseResult as URLParseResult +from six.moves.urllib.parse import (parse_qs, parse_qsl, quote, urlencode, + urlparse, urlunparse) + + +class StrUtils(object): + @classmethod + def remove_tail(cls, string, tail): + if string.endswith(tail): + string = string[:-len(tail)] + return string + + @classmethod + def remove_head(cls, string, head): + if string.startswith(head): + string = string[len(head):] + return string + + @classmethod + def decapitate(cls, *args, **kwargs): + return cls.remove_head(*args, **kwargs) + + @classmethod + def eviscerate(cls, *args, **kwargs): + return cls.remove_tail(*args, **kwargs) + + @classmethod + def to_text(cls, string, encoding='utf-8', errors='replace'): + if isinstance(string, text_type): + return string + if isinstance(string, binary_type): + try: + return string.decode(encoding, errors=errors) + except TypeError: + return ''.join([ + unichr(c) for c in iterbytes(string) + ]) + return text_type(string) + + @classmethod + def to_binary(cls, string, encoding='utf8', errors='backslashreplace'): + if isinstance(string, binary_type): + return string + if not isinstance(string, text_type): + string = text_type(string) + return string.encode(encoding, errors) + + @classmethod + def jsonencode(cls, data, **kwargs): + if PY2: + for encoding in filter(None, { + kwargs.get('encoding', 'utf8'), + sys.getdefaultencoding(), + sys.getfilesystemencoding(), + locale.getpreferredencoding(), + 'utf8', + }): + try: + kwargs['encoding'] = encoding + return json.dumps(data, **kwargs) + except UnicodeDecodeError: + pass + kwargs.pop('encoding', None) + kwargs['cls'] = BytesJsonEncoder + return json.dumps(data, **kwargs) + + +class BytesJsonEncoder(json.JSONEncoder): + def default(self, obj): + + if isinstance(obj, binary_type): + return StrUtils.to_text(obj, errors='replace') + # Let the base class default method raise the TypeError + return json.JSONEncoder.default(self, obj) + + +class SeqUtils(object): + @classmethod + def filter_true(cls, seq): + return [item for item in seq if item] + + @classmethod + def filter_unique_true(cls, list_a): + response = [] + for i in list_a: + if i and i not in response: + response.append(i) + return response + + @classmethod + def combine_two_ordered_dicts(cls, dict_a, dict_b): + """ + Combine OrderedDict a with b by starting with A and overwriting with + items from b. Attempt to preserve order + """ + if not dict_a: + return dict_b if dict_b else OrderedDict() + if not dict_b: + return dict_a + response = OrderedDict(dict_a.items()) + for key, value in dict_b.items(): + response[key] = value + return response + + @classmethod + def combine_ordered_dicts(cls, *args): + """ + Combine all dict arguments overwriting former with items from latter. + Attempt to preserve order + """ + response = OrderedDict() + for arg in args: + response = cls.combine_two_ordered_dicts(response, arg) + return response + + +class UrlUtils(object): + + reg_netloc = r'(?P[^:]+)(:(?P\d+))?' + + @classmethod + def get_query_list(cls, url): + """Return the list of queries in the url.""" + return parse_qsl(urlparse(url).query) + + @classmethod + def get_query_dict_singular(cls, url): + """ + Return an ordered mapping from each key in the query string to a + singular value. + """ + query_list = cls.get_query_list(url) + return OrderedDict(query_list) + # query_dict = parse_qs(urlparse(url).query) + # query_dict_singular = dict([ + # (key, value[0]) for key, value in query_dict.items() + # ]) + # return query_dict_singular + + @classmethod + def set_query_singular(cls, url, key, value): + """ Sets or overrides a single query in a url """ + query_dict_singular = cls.get_query_dict_singular(url) + # print "setting key %s to value %s" % (key, value) + query_dict_singular[key] = value + # print query_dict_singular + query_string = urlencode(query_dict_singular) + # print "new query string", query_string + return cls.substitute_query(url, query_string) + + @classmethod + def get_query_singular(cls, url, key, default=None): + """ Gets the value of a single query in a url """ + url_params = parse_qs(urlparse(url).query) + values = url_params.get(key, [default]) + assert len(values) == 1, \ + "ambiguous value, could not get singular for key: %s" % key + return values[0] + + @classmethod + def del_query_singular(cls, url, key): + """ deletes a singular key from the query string """ + query_dict_singular = cls.get_query_dict_singular(url) + if key in query_dict_singular: + del query_dict_singular[key] + query_string = urlencode(query_dict_singular) + url = cls.substitute_query(url, query_string) + return url + + @classmethod + def split_url_query_singular(cls, url): + query_dict_singular = cls.get_query_dict_singular(url) + split_url = cls.substitute_query(url) + return split_url, query_dict_singular + + @classmethod + def substitute_query(cls, url, query_string=None): + """ Replaces the query string in the url with the provided string or + removes the query string if none is provided """ + if not query_string: + query_string = '' + + urlparse_result = urlparse(url) + + return urlunparse(URLParseResult( + scheme=urlparse_result.scheme, + netloc=urlparse_result.netloc, + path=urlparse_result.path, + params=urlparse_result.params, + query=query_string, + fragment=urlparse_result.fragment + )) + + @classmethod + def add_query(cls, url, new_key, new_value): + """ adds a query parameter to the given url """ + new_query_item = '%s=%s' % (quote(str(new_key)), quote(str(new_value))) + # new_query_item = '='.join([quote(new_key), quote(new_value)]) + new_query_string = "&".join(SeqUtils.filter_true([ + urlparse(url).query, + new_query_item + ])) + return cls.substitute_query(url, new_query_string) + + @classmethod + def is_ssl(cls, url): + return urlparse(url).scheme == 'https' + + @classmethod + def join_components(cls, components): + return reduce(posixpath.join, SeqUtils.filter_true(components)) + + @staticmethod + def get_value_like_as_php(val): + """ Prepare value for quote """ + try: + base = basestring + except NameError: + base = (str, bytes) + + if isinstance(val, base): + return val + elif isinstance(val, bool): + return "1" if val else "" + elif isinstance(val, int): + return str(val) + elif isinstance(val, float): + return str(int(val)) if val % 1 == 0 else str(val) + else: + return "" + + @staticmethod + def beautify_response(response): + """ Returns a beautified response in the default locale """ + content_type = 'html' + try: + content_type = getattr(response, 'headers', {}).get( + 'Content-Type', content_type) + except: + pass + if 'html' in content_type.lower(): + return BeautifulSoup(response.text, 'lxml').prettify().encode( + errors='backslashreplace') + else: + return response.text + + @classmethod + def remove_port(cls, url): + """ Remove the port number from a URL""" + + urlparse_result = urlparse(url) + + return urlunparse(URLParseResult( + scheme=urlparse_result.scheme, + netloc=re.sub(r':\d+', r'', urlparse_result.netloc), + path=urlparse_result.path, + params=urlparse_result.params, + query=urlparse_result.query, + fragment=urlparse_result.fragment + )) + + @classmethod + def remove_default_port(cls, url, defaults=None): + """ Remove the port number from a URL if it is a default port. """ + if defaults is None: + defaults = { + 'http': 80, + 'https': 443 + } + + urlparse_result = urlparse(url) + match = re.match( + cls.reg_netloc, + urlparse_result.netloc + ) + assert match, "netloc %s should match regex %s" + if match.groupdict().get('port'): + hostname = match.groupdict()['hostname'] + port = int(match.groupdict()['port']) + scheme = urlparse_result.scheme.lower() + + if defaults[scheme] == port: + return urlunparse(URLParseResult( + scheme=urlparse_result.scheme, + netloc=hostname, + path=urlparse_result.path, + params=urlparse_result.params, + query=urlparse_result.query, + fragment=urlparse_result.fragment + )) + return urlunparse(URLParseResult( + scheme=urlparse_result.scheme, + netloc=urlparse_result.netloc, + path=urlparse_result.path, + params=urlparse_result.params, + query=urlparse_result.query, + fragment=urlparse_result.fragment + )) + + @classmethod + def lower_scheme(cls, url): + """ ensure the scheme of the url is lowercase. """ + urlparse_result = urlparse(url) + return urlunparse(URLParseResult( + scheme=urlparse_result.scheme.lower(), + netloc=urlparse_result.netloc, + path=urlparse_result.path, + params=urlparse_result.params, + query=urlparse_result.query, + fragment=urlparse_result.fragment + )) + + @classmethod + def normalize_str(cls, string): + """ Normalize string for the purposes of url query parameters. """ + return quote(string, '~') + + @classmethod + def normalize_params(cls, params): + """ + Normalize parameters. + + Works with RFC 5849 logic. params is a list of key, value pairs. + """ + if isinstance(params, dict): + params = params.items() + params = [ + ( + cls.normalize_str(key), + cls.normalize_str(UrlUtils.get_value_like_as_php(value)) + ) for key, value in params + ] + + response = params + return response + + @classmethod + def sorted_params(cls, params): + """ + Sort parameters. + + works with RFC 5849 logic. params is a list of key, value pairs + """ + + if isinstance(params, dict): + params = params.items() + + if not params: + return params + # return sorted(params) + ordered = [] + params_sorting = [] + for i, (key, value) in enumerate(params): + base_key = key.split('[')[0] + params_sorting.append((base_key, value, i, key)) + + for _, value, _, key in sorted(params_sorting): + ordered.append((key, value)) + + return ordered + + @classmethod + def unique_params(cls, params): + if isinstance(params, dict): + params = params.items() + + if not params: + return params + + unique_params = [] + seen_keys = [] + for key, value in params: + if key not in seen_keys: + unique_params.append((key, value)) + seen_keys.append(key) + return unique_params + + @classmethod + def flatten_params(cls, params): + if isinstance(params, dict): + params = params.items() + params = cls.normalize_params(params) + params = cls.sorted_params(params) + params = cls.unique_params(params) + return "&".join(["%s=%s" % (key, value) for key, value in params]) diff --git a/wordpress/transport.py b/wordpress/transport.py new file mode 100644 index 0000000..3f05a97 --- /dev/null +++ b/wordpress/transport.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- + +""" +Wordpress Requests Class +""" + +__title__ = "wordpress-requests" + +import logging +from pprint import pformat + +from requests import Session + +from wordpress import __default_api__, __default_api_version__, __version__ +from wordpress.helpers import SeqUtils, StrUtils, UrlUtils + + +class API_Requests_Wrapper(object): + """ provides a wrapper for making requests that handles session info """ + + def __init__(self, url, **kwargs): + self.logger = logging.getLogger(__name__) + self.url = url + self.api = kwargs.get("api", __default_api__) + self.api_version = kwargs.get("version", __default_api_version__) + self.timeout = kwargs.get("timeout", 5) + self.verify_ssl = kwargs.get("verify_ssl", True) + self.session = Session() + self.headers = kwargs.get("headers", {}) + + @property + def is_ssl(self): + return UrlUtils.is_ssl(self.url) + + @property + def api_url(self): + components = [ + self.url, + self.api + ] + return UrlUtils.join_components(components) + + @property + def is_wp_json_v1(self): + return self.api == 'wp-json' and self.api_version == 'wp/v1' + + @property + def api_ver_url(self): + components = [ + self.url, + self.api, + ] + if not self.is_wp_json_v1: + components += [ + self.api_version + ] + return UrlUtils.join_components(components) + + @property + def api_ver_url_no_port(self): + return UrlUtils.remove_port(self.api_ver_url) + + def endpoint_url(self, endpoint): + endpoint = StrUtils.decapitate(endpoint, self.api_ver_url) + endpoint = StrUtils.decapitate(endpoint, self.api_ver_url_no_port) + endpoint = StrUtils.decapitate(endpoint, '/') + components = [ + self.url, + self.api + ] + if not self.is_wp_json_v1: + components += [ + self.api_version + ] + components += [ + endpoint + ] + return UrlUtils.join_components(components) + + def request( + self, method, url, auth=None, params=None, data=None, **kwargs + ): + headers = { + "user-agent": "Wordpress API Client-Python/%s" % __version__, + "accept": "application/json" + } + if data is not None: + headers["content-type"] = "application/json;charset=utf-8" + headers = SeqUtils.combine_ordered_dicts( + headers, + self.headers + ) + headers = SeqUtils.combine_ordered_dicts( + headers, + kwargs.get('headers', {}) + ) + + request_kwargs = dict( + method=method, + url=url, + headers=headers, + verify=self.verify_ssl, + timeout=kwargs.get('timeout', self.timeout), + ) + request_kwargs.update(kwargs) + if auth is not None: + request_kwargs['auth'] = auth + if params is not None: + request_kwargs['params'] = params + if data is not None: + request_kwargs['data'] = data + self.logger.debug("request_kwargs:\n%s" % pformat([ + (key, repr(value)[:1000]) for key, value in request_kwargs.items() + ])) + response = self.session.request( + **request_kwargs + ) + self.logger.debug("response_code:\n%s" % pformat(response.status_code)) + try: + response_json = response.json() + self.logger.debug("response_json:\n%s" % + (pformat(response_json)[:1000])) + except ValueError: + response_text = response.text + self.logger.debug("response_text:\n%s" % (response_text[:1000])) + response_headers = {} + if hasattr(response, 'headers'): + response_headers = response.headers + self.logger.debug("response_headers:\n%s" % pformat(response_headers)) + response_links = {} + if hasattr(response, 'links') and response.links: + response_links = response.links + self.logger.debug("response_links:\n%s" % pformat(response_links)) + + return response + + def get(self, *args, **kwargs): + return self.request("GET", *args, **kwargs) + + def post(self, *args, **kwargs): + return self.request("POST", *args, **kwargs)