diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..a90b1009 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +influxdb.egg-info/ +*.pyc +*.py.swp +.tox diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..bb686f7e --- /dev/null +++ b/.travis.yml @@ -0,0 +1,9 @@ +language: python +python: + - "3.3" + - "3.2" + - "2.7" +install: + - "pip install -r requirements.txt --use-mirrors" + - "pip install -r test-requirements.txt --use-mirrors" +script: nosetests diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 00000000..44ebdabd --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include requirements.txt +include test-requirements.txt diff --git a/README.md b/README.md index 9976fe41..feeff885 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,18 @@ influxdb-python =============== Python client for InfluxDB + +For developers +-------------- + +To test influxdb-python with multiple version of Python, you can use tox: + +```` +$ tox +```` + +If you don't have all Python version listed in tox.ini, then + +```` +$ tox -e py27 +```` diff --git a/examples/tutorial.py b/examples/tutorial.py new file mode 100644 index 00000000..2f4319ba --- /dev/null +++ b/examples/tutorial.py @@ -0,0 +1,67 @@ +import argparse + +from influxdb import InfluxDBClient + + +def main(host=None, port=None): + user = 'root' + password = 'root' + dbname = 'example' + dbuser = 'smly' + dbuser_password = 'my_secret_password' + query = 'select column_one from foo;' + json_body = [ + { + "points": [ + ["1", 1, 1.0], + ["2", 2, 2.0] + ], + "name": "foo", + "columns": ["column_one", "column_two", "column_three"] + } +] + + + client = InfluxDBClient(host, port, user, password, dbname) + + print("Create database: " + dbname) + client.create_database(dbname) + + dbusers = client.get_database_users() + print("Get list of database users: {0}".format(dbusers)) + + print("Add database user: " + dbuser) + client.add_database_user(dbuser, dbuser_password) + + dbusers = client.get_database_users() + print("Get list of database users again: {0}".format(dbusers)) + + print("Swtich user: " + dbuser) + client.switch_user(dbuser, dbuser_password) + + print("Write points: {0}".format(json_body)) + client.write_points(json_body) + + print("Queying data: " + query) + result = client.query(query) + + print("Result: {0}".format(result)) + + print("Swtich user: " + user) + client.switch_user(user, password) + + print("Delete database: " + dbname) + client.delete_database(dbname) + + +def parse_args(): + parser = argparse.ArgumentParser( + description='example code to play with InfluxDB') + parser.add_argument('--host', type=str, required=True) + parser.add_argument('--port', type=int, required=True) + return parser.parse_args() + + +if __name__ == '__main__': + args = parse_args() + main(host=args.host, port=args.port) diff --git a/influxdb/__init__.py b/influxdb/__init__.py new file mode 100644 index 00000000..e31f65b7 --- /dev/null +++ b/influxdb/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +from influxdb.client import InfluxDBClient + + +__all__ = ['InfluxDBClient'] + +__version__ = '0.1.1' diff --git a/influxdb/client.py b/influxdb/client.py new file mode 100644 index 00000000..90d1ff3a --- /dev/null +++ b/influxdb/client.py @@ -0,0 +1,529 @@ +# -*- coding: utf-8 -*- +""" +python client for influxdb +""" +import json + +from six.moves.urllib.parse import urlencode + +import requests + + +class InfluxDBClient(object): + """ + InfluxDB Client + """ + + def __init__(self, host, port, username, password, database): + """ + Initialize client + """ + self._host = host + self._port = port + self._username = username + self._password = password + self._database = database + self._baseurl = "http://{0}:{1}".format(self._host, self._port) + + self._headers = { + 'Content-type': 'application/json', + 'Accept': 'text/plain'} + + # Change member variables + + def switch_db(self, database): + """ + Change client database + + Parameters + ---------- + database : string + """ + self._database = database + + def switch_user(self, username, password): + """ + Change client username + + Parameters + ---------- + username : string + password : string + """ + self._username = username + self._password = password + + # Writing Data + # + # Assuming you have a database named foo_production you can write data + # by doing a POST to /db/foo_production/series?u=some_user&p=some_password + # with a JSON body of points. + + def write_points(self, data): + """ + Write to multiple time series names + """ + response = requests.post( + "{0}/db/{1}/series?u={2}&p={3}".format( + self._baseurl, + self._database, + self._username, + self._password), + data=json.dumps(data), + headers=self._headers) + + if response.status_code == 200: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def write_points_with_precision(self, data, time_precision='s'): + """ + Write to multiple time series names + """ + if time_precision not in ['s', 'm', 'u']: + raise Exception( + "Invalid time precision is given. (use 's','m' or 'u')") + + url_format = "{0}/db/{1}/series?u={2}&p={3}&time_precision={4}" + + response = requests.post(url_format.format( + self._baseurl, + self._database, + self._username, + self._password, + time_precision), + data=json.dumps(data), + headers=self._headers) + + if response.status_code == 200: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + # One Time Deletes + + def delete_points(self, name, + regex=None, start_epoch=None, end_epoch=None): + """ + TODO: Delete a range of data + + 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, + but it is documented in http://influxdb.org/docs/api/http.html. + See also: src/api/http/api.go:l57 + """ + raise NotImplementedError() + + # Regularly Scheduled Deletes + + def create_scheduled_delete(self, json_body): + """ + TODO: Create scheduled delete + + 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, + but it is documented in http://influxdb.org/docs/api/http.html. + See also: src/api/http/api.go:l57 + """ + raise NotImplementedError() + + # get list of deletes + # curl http://localhost:8086/db/site_dev/scheduled_deletes + # + # remove a regularly scheduled delete + # curl -X DELETE http://localhost:8086/db/site_dev/scheduled_deletes/:id + + def get_list_scheduled_delete(self): + """ + TODO: Get list of scheduled deletes + + 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, + but it is documented in http://influxdb.org/docs/api/http.html. + See also: src/api/http/api.go:l57 + """ + raise NotImplementedError() + + def remove_scheduled_delete(self, delete_id): + """ + TODO: Remove scheduled delete + + 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, + but it is documented in http://influxdb.org/docs/api/http.html. + See also: src/api/http/api.go:l57 + """ + raise NotImplementedError() + + # Querying Data + # + # GET db/:name/series. It takes five parameters + def query(self, query, time_precision='s', chunked=False): + """ + Quering data + """ + if time_precision not in ['s', 'm', 'u']: + raise Exception( + "Invalid time precision is given. (use 's','m' or 'u')") + + if chunked is True: + chunked_param = 'true' + else: + chunked_param = 'false' + + encoded_query = urlencode({ + 'q': query}) + + url_format = "{0}/db/{1}/series?{2}&u={3}&p={4}" + url_format += "&time_precision={5}&chunked={6}" + + response = requests.get(url_format.format( + self._baseurl, + self._database, + encoded_query, + self._username, + self._password, + time_precision, + chunked_param)) + + if response.status_code == 200: + return response.content + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + # Creating and Dropping Databases + # + # ### create a database + # curl -X POST http://localhost:8086/db -d '{"name": "site_development"}' + # + # ### drop a database + # curl -X DELETE http://localhost:8086/db/site_development + + def create_database(self, database): + """ + Create a database + + Parameters + ---------- + database: string + database name + """ + response = requests.post("{0}/db?u={1}&p={2}".format( + self._baseurl, + self._username, + self._password), + data=json.dumps({'name': database}), + headers=self._headers) + + if response.status_code == 201: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def delete_database(self, database): + """ + Drop a database + + Parameters + ---------- + database: string + database name + """ + response = requests.delete("{0}/db/{1}?u={2}&p={3}".format( + self._baseurl, + database, + self._username, + self._password)) + + if response.status_code == 204: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + # Security + # get list of cluster admins + # curl http://localhost:8086/cluster_admins?u=root&p=root + + # add cluster admin + # curl -X POST http://localhost:8086/cluster_admins?u=root&p=root \ + # -d '{"username": "paul", "password": "i write teh docz"}' + + # update cluster admin password + # curl -X POST http://localhost:8086/cluster_admins/paul?u=root&p=root \ + # -d '{"password": "new pass"}' + + # delete cluster admin + # curl -X DELETE http://localhost:8086/cluster_admins/paul?u=root&p=root + + # Database admins, with a database name of site_dev + # get list of database admins + # curl http://localhost:8086/db/site_dev/admins?u=root&p=root + + # add database admin + # curl -X POST http://localhost:8086/db/site_dev/admins?u=root&p=root \ + # -d '{"username": "paul", "password": "i write teh docz"}' + + # update database admin password + # curl -X POST http://localhost:8086/db/site_dev/admins/paul?u=root&p=root\ + # -d '{"password": "new pass"}' + + # delete database admin + # curl -X DELETE \ + # http://localhost:8086/db/site_dev/admins/paul?u=root&p=root + + def get_list_cluster_admins(self): + """ + Get list of cluster admins + """ + response = requests.get( + "{0}/cluster_admins?u={1}&p={2}".format( + self._baseurl, + self._username, + self._password)) + + if response.status_code == 200: + return json.loads(response.content) + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def add_cluster_admin(self, new_username, new_password): + """ + Add cluster admin + """ + response = requests.post( + "{0}/cluster_admins?u={1}&p={2}".format( + self._baseurl, + self._username, + self._password), + data=json.dumps({ + 'username': new_username, + 'password': new_password}), + headers=self._headers) + + if response.status_code == 200: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def update_cluster_admin_password(self, username, new_password): + """ + Update cluster admin password + """ + response = requests.post( + "{0}/cluster_admins/{1}?u={2}&p={3}".format( + self._baseurl, + username, + self._username, + self._password), + data=json.dumps({ + 'password': new_password}), + headers=self._headers) + + if response.status_code == 200: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def delete_cluster_admin(self, username): + """ + Delete cluster admin + """ + response = requests.delete("{0}/cluster_admins/{1}?u={2}&p={3}".format( + self._baseurl, + username, + self._username, + self._password)) + + if response.status_code == 204: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def set_database_admin(self, username): + """ + Set user as database admin + """ + response = requests.post( + "{0}/db/{1}/admins/{2}?u={3}&p={4}".format( + self._baseurl, + self._database, + username, + self._username, + self._password)) + if response.status_code == 200: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def unset_database_admin(self, username): + """ + Unset user as database admin + """ + response = requests.delete( + "{0}/db/{1}/admins/{2}?u={3}&p={4}".format( + self._baseurl, + self._database, + username, + self._username, + self._password)) + if response.status_code == 200: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def get_list_database_admins(self): + """ + TODO: Get list of database admins + + 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, + but it is documented in http://influxdb.org/docs/api/http.html. + See also: src/api/http/api.go:l57 + """ + raise NotImplementedError() + + def add_database_admin(self, new_username, new_password): + """ + TODO: Add cluster admin + + 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, + but it is documented in http://influxdb.org/docs/api/http.html. + See also: src/api/http/api.go:l57 + """ + raise NotImplementedError() + + def update_database_admin_password(self, username, new_password): + """ + TODO: Update database admin password + + 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, + but it is documented in http://influxdb.org/docs/api/http.html. + See also: src/api/http/api.go:l57 + """ + raise NotImplementedError() + + def delete_database_admin(self, username): + """ + TODO: Delete database admin + + 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, + but it is documented in http://influxdb.org/docs/api/http.html. + See also: src/api/http/api.go:l57 + """ + raise NotImplementedError() + + ### + # Limiting User Access + + # Database users + # get list of database users + # curl http://localhost:8086/db/site_dev/users?u=root&p=root + + # add database user + # curl -X POST http://localhost:8086/db/site_dev/users?u=root&p=root \ + # -d '{"username": "paul", "password": "i write teh docz"}' + + # update database user password + # curl -X POST http://localhost:8086/db/site_dev/users/paul?u=root&p=root \ + # -d '{"password": "new pass"}' + + # delete database user + # curl -X DELETE http://localhost:8086/db/site_dev/users/paul?u=root&p=root + + def get_database_users(self): + """ + Get list of database users + """ + response = requests.get( + "{0}/db/{1}/users?u={2}&p={3}".format( + self._baseurl, + self._database, + self._username, + self._password)) + + if response.status_code == 200: + return json.loads(response.content) + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def add_database_user(self, new_username, new_password): + """ + Add database user + """ + response = requests.post( + "{0}/db/{1}/users?u={2}&p={3}".format( + self._baseurl, + self._database, + self._username, + self._password), + data=json.dumps({ + 'username': new_username, + 'password': new_password}), + headers=self._headers) + + if response.status_code == 200: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def update_database_user_password(self, username, new_password): + """ + Update password + """ + response = requests.post( + "{0}/db/{1}/users/{2}?u={3}&p={4}".format( + self._baseurl, + self._database, + username, + self._username, + self._password), + data=json.dumps({ + 'password': new_password}), + headers=self._headers) + + if response.status_code == 200: + if username == self._username: + self._password = new_password + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + def delete_database_user(self, username): + """ + Delete database user + """ + response = requests.delete( + "{0}/db/{1}/users/{2}?u={3}&p={4}".format( + self._baseurl, + self._database, + username, + self._username, + self._password)) + + if response.status_code == 200: + return True + else: + raise Exception( + "{0}: {1}".format(response.status_code, response.content)) + + # update the user by POSTing to db/site_dev/users/paul + + def update_permission(self, username, json_body): + """ + TODO: Update read/write permission + + 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, + but it is documented in http://influxdb.org/docs/api/http.html. + See also: src/api/http/api.go:l57 + """ + raise NotImplementedError() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..c7d0ccea --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +six +requests diff --git a/setup.py b/setup.py new file mode 100755 index 00000000..8bff30ad --- /dev/null +++ b/setup.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +try: + import distribute_setup + distribute_setup.use_setuptools() +except: + pass + +try: + from setuptools import setup, find_packages +except ImportError: + from distutils.core import setup + +import os +import re + + +with open(os.path.join(os.path.dirname(__file__), + 'influxdb', '__init__.py')) as f: + version = re.search("__version__ = '([^']+)'", f.read()).group(1) + +with open('requirements.txt', 'r') as f: + requires = [x.strip() for x in f if x.strip()] + +with open('test-requirements.txt', 'r') as f: + test_requires = [x.strip() for x in f if x.strip()] + +setup( + name='influxdb', + version=version, + description="influxdb client", + packages=find_packages(exclude=['tests']), + test_suite='tests', + tests_require=test_requires, + install_requires=requires, + extras_require={'test': test_requires}, +) diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 00000000..fac3bb36 --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,3 @@ +requests +nose +mock diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/tests/influxdb/__init__.py b/tests/influxdb/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/tests/influxdb/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py new file mode 100644 index 00000000..811548d3 --- /dev/null +++ b/tests/influxdb/client_test.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +""" +unit tests +""" +import json + +import requests +from nose.tools import raises +from mock import patch + +from influxdb import InfluxDBClient + + +def _build_response_object(status_code=200, content=""): + resp = requests.Response() + resp.status_code = status_code + resp._content = content + return resp + + +class TestInfluxDBClient(object): + def test_switch_db(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'database') + cli.switch_db('another_database') + assert cli._database == 'another_database' + + def test_switch_user(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'database') + cli.switch_user('another_username', 'another_password') + assert cli._username == 'another_username' + assert cli._password == 'another_password' + + def test_write_points(self): + data = [ + { + "points": [ + ["1", 1, 1.0], + ["2", 2, 2.0] + ], + "name": "foo", + "columns": ["column_one", "column_two", "column_three"] + } + ] + + with patch.object(requests, 'post') as mocked_post: + mocked_post.return_value = _build_response_object(status_code=200) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + assert cli.write_points(data) is True + + @raises(Exception) + def test_write_points_fails(self): + with patch.object(requests, 'post') as mocked_post: + mocked_post.return_value = _build_response_object(status_code=500) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.write_points([]) + + def test_write_points_with_precision(self): + data = [ + { + "points": [ + ["1", 1, 1.0], + ["2", 2, 2.0] + ], + "name": "foo", + "columns": ["column_one", "column_two", "column_three"] + } + ] + + with patch.object(requests, 'post') as mocked_post: + mocked_post.return_value = _build_response_object(status_code=200) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + assert cli.write_points_with_precision(data) is True + + @raises(Exception) + def test_write_points_with_precision_fails(self): + with patch.object(requests, 'post') as mocked_post: + mocked_post.return_value = _build_response_object(status_code=500) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.write_points_with_precision([]) + + @raises(NotImplementedError) + def test_delete_points(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.delete_points([]) + + @raises(NotImplementedError) + def test_create_scheduled_delete(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.create_scheduled_delete([]) + + @raises(NotImplementedError) + def test_get_list_scheduled_delete(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.get_list_scheduled_delete() + + @raises(NotImplementedError) + def test_remove_scheduled_delete(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.remove_scheduled_delete(1) + + def test_query(self): + expected = """[{"name":"foo","columns":["time","sequence_number","column_one"],"points":[[1383876043,16,"2"],[1383876043,15,"1"],[1383876035,14,"2"],[1383876035,13,"1"]]}]""" + with patch.object(requests, 'get') as mocked_get: + mocked_get.return_value = _build_response_object( + status_code=200, + content=expected) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + result = cli.query('select column_one from foo;') + assert len(json.loads(result)[0]['points']) == 4 + + @raises(Exception) + def test_query_fail(self): + with patch.object(requests, 'get') as mocked_get: + mocked_get.return_value = _build_response_object(status_code=401) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.query('select column_one from foo;') + + def test_create_database(self): + with patch.object(requests, 'post') as mocked_post: + mocked_post.return_value = _build_response_object(status_code=201) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + assert cli.create_database('new_db') is True + + @raises(Exception) + def test_creata_database_fails(self): + with patch.object(requests, 'post') as mocked_post: + mocked_post.return_value = _build_response_object(status_code=401) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.create_database('new_db') + + def test_delete_database(self): + with patch.object(requests, 'delete') as mocked_post: + mocked_post.return_value = _build_response_object(status_code=204) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + assert cli.delete_database('old_db') is True + + @raises(Exception) + def test_delete_database_fails(self): + with patch.object(requests, 'delete') as mocked_post: + mocked_post.return_value = _build_response_object(status_code=401) + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.delete_database('old_db') + + def test_get_list_cluster_admins(self): + pass + + def test_add_cluster_admin(self): + pass + + def test_update_cluster_admin_password(self): + pass + + def test_delete_cluster_admin(self): + pass + + def test_set_database_admin(self): + pass + + def test_unset_database_admin(self): + pass + + @raises(NotImplementedError) + def test_get_list_database_admins(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.get_list_database_admins() + + @raises(NotImplementedError) + def test_add_database_admin(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.add_database_admin('admin', 'admin_secret_password') + + @raises(NotImplementedError) + def test_update_database_admin_password(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.update_database_admin_password('admin', 'admin_secret_password') + + @raises(NotImplementedError) + def test_delete_database_admin(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.delete_database_admin('admin') + + def test_get_database_user(self): + pass + + def test_add_database_user(self): + pass + + def test_update_database_user_password(self): + pass + + def test_delete_database_user(self): + pass + + @raises(NotImplementedError) + def test_update_permission(self): + cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') + cli.update_permission('admin', []) diff --git a/tox.ini b/tox.ini new file mode 100644 index 00000000..f5128281 --- /dev/null +++ b/tox.ini @@ -0,0 +1,14 @@ +[tox] +envlist = py33, py27, flake8 + +[testenv] +commands = + pip install -r test-requirements.txt + nosetests + +[testenv:flake8] +deps = + flake8 + pep8-naming + +commands = flake8 influxdb