8000 edited for influxdb_relay functions · jjneojiajun/influxdb-python@2719fb0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2719fb0

Browse files
author
Neo Jia Jun
committed
edited for influxdb_relay functions
1 parent cb15c2e commit 2719fb0

33 files changed

+161
-29
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"python.pythonPath": "/Users/jjneojiajun/anaconda3/envs/resync_dev/bin/python"
3+
}

influxdb/__init__.py renamed to influxdb_relay/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
]
1919

2020

21-
__version__ = '5.2.3'
21+
__version__ = '1.0.0'
File renamed without changes.
File renamed without changes.

influxdb/client.py renamed to influxdb_relay/client.py

Lines changed: 152 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
from six.moves import xrange
2020
from six.moves.urllib.parse import urlparse
2121

22-
from influxdb.line_protocol import make_lines, quote_ident, quote_literal
23-
from influxdb.resultset import ResultSet
22+
from influxdb_relay.line_protocol import make_lines, quote_ident, quote_literal
23+
from influxdb_relay.resultset import ResultSet
2424
from .exceptions import InfluxDBClientError
2525
from .exceptions import InfluxDBServerError
2626

@@ -285,6 +285,7 @@ def request(self, url, method='GET', params=None, data=None,
285285
verify=self._verify_ssl,
286286
timeout=self._timeout
287287
)
288+
288289
break
289290
except (requests.exceptions.ConnectionError,
290291
requests.exceptions.HTTPError,
@@ -637,23 +638,57 @@ def get_list_database(self):
637638
"""
638639
return list(self.query("SHOW DATABASES").get_points())
639640

641+
def admin_request(self, data):
642+
url = "{0}/admin".format(self._baseurl)
643+
644+
# Try to send the requests more than once by default
645+
retry = True
646+
_try = 0
647+
while retry:
648+
try:
649+
response = self._session.request(
650+
method="POST",
651+
url=url,
652+
data=data
653+
)
654+
break
655+
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout):
656+
_try += 1
657+
if self._retries != 0:
658+
retry = _try < self._retries
659+
else:
660+
time.sleep((2 ** _try) * random.random() / 100.0)
661+
662+
return response
663+
640664
def create_database(self, dbname):
641665
"""Create a new database in InfluxDB.
642666
643667
:param dbname: the name of the database to create
644668
:type dbname: str
645669
"""
646-
self.query("CREATE DATABASE {0}".format(quote_ident(dbname)),
647-
method="POST")
670+
data = {'q': "CREATE DATABASE {0}".format(quote_ident(dbname))}
671+
672+
response = self.admin_request(data)
673+
674+
if response.status_code == 204:
675+
return "Database {0} has been created".format(quote_ident(dbname))
676+
else:
677+
return "Unable to create DB"
648678

649679
def drop_database(self, dbname):
650680
"""Drop a database from InfluxDB.
651681
652682
:param dbname: the name of the database to drop
653683
:type dbname: str
654684
"""
655-
self.query("DROP DATABASE {0}".format(quote_ident(dbname)),
656-
method="POST")
685+
data = {'q': "DROP DATABASE {0}".format(quote_ident(dbname))}
686+
response = self.admin_request(data)
687+
688+
if response.status_code == 204:
689+
return "Database {0} has been dropped".format(quote_ident(dbname))
690+
else:
691+
return "Unable to drop {0}".format(quote_ident(dbname))
657692

658693
def get_list_measurements(self):
659694
"""Get the list of measurements in InfluxDB.
@@ -679,8 +714,13 @@ def drop_measurement(self, measurement):
679714
:param measurement: the name of the measurement to drop
680715
:type measurement: str
681716
"""
682-
self.query("DROP MEASUREMENT {0}".format(quote_ident(measurement)),
683-
method="POST")
717+
data = {'q': "DROP MEASUREMENT {0}".format(quote_ident(measurement))}
718+
response = self.admin_request(data)
719+
720+
if response.status_code == 204:
721+
return "Measurement {0} has been dropped".format(quote_ident(measurement))
722+
else:
723+
return "Unable to drop {0} measurement".format(quote_ident(measurement))
684724

685725
def create_retention_policy(self, name, duration, replication,
686726
database=None,
@@ -722,7 +762,13 @@ def create_retention_policy(self, name, duration, replication,
722762
if default is True:
723763
query_string += " DEFAULT"
724764

725-
self.query(query_string, method="POST")
765+
data = {'q': query_string}
766+
response = self.admin_request(data)
767+
768+
if response.status_code == 204:
769+
return "RETENTION POLICY {0} ON {1} CREATED".format(quote_ident(name), quote_ident(database or self._database))
770+
else:
771+
return "UNABLE TO CREATE RETENTION POLICY REQUESTED"
726772

727773
def alter_retention_policy(self, name, database=None,
728774
duration=None, replication=None,
@@ -761,7 +807,7 @@ def alter_retention_policy(self, name, database=None,
761807
query_string = (
762808
"ALTER RETENTION POLICY {0} ON {1}"
763809
).format(quote_ident(name),
764-
quote_ident(database or self._database), shard_duration)
810+
quote_ident(database or self._database))
765811
if duration:
766812
query_string += " DURATION {0}".format(duration)
767813
if shard_duration:
@@ -771,7 +817,14 @@ def alter_retention_policy(self, name, database=None,
771817
if default is True:
772818
query_string += " DEFAULT"
773819

774-
self.query(query_string, method="POST")
820+
data = {'q': query_string}
821+
response = self.admin_request(data)
822+
823+
if response.status_code == 204:
824+
return "ALTERED RETENTION POLICY {0} ON {1}".format(quote_ident(name),
825+
quote_ident(database or self._database))
826+
else:
827+
return "UNABLE TO ALTER RETENTION POLICY"
775828

776829
def drop_retention_policy(self, name, database=None):
777830
"""Drop an existing retention policy for a database.
@@ -785,7 +838,14 @@ def drop_retention_policy(self, name, database=None):
785838
query_string = (
786839
"DROP RETENTION POLICY {0} ON {1}"
787840
).format(quote_ident(name), quote_ident(database or self._database))
788-
self.query(query_string, method="POST")
841+
842+
data = {'q': query_string}
843+
response = self.admin_request(data)
844+
845+
if response.status_code == 204:
846+
return "DROPPED RETENTION POLICY {0} ON {1}".format(quote_ident(name), quote_ident(database or self._database))
847+
else:
848+
return "UNABLE TO DROP RETENTION POLICY"
789849

790850
def get_list_retention_policies(self, database=None):
791851
"""Get the list of retention policies for a database.
@@ -851,16 +911,28 @@ def create_user(self, username, password, admin=False):
851911
quote_ident(username), quote_literal(password))
852912
if admin:
853913
text += ' WITH ALL PRIVILEGES'
854-
self.query(text, method="POST")
914+
data = {'q': text}
915+
response = self.admin_request(data)
916+
917+
if response.status_code == 204:
918+
return "{0} is CREATED".format(quote_ident(username))
919+
else:
920+
return "UNABLE TO CREATE USER"
855921

856922
def drop_user(self, username):
857923
"""Drop a user from InfluxDB.
858924
859925
:param username: the username to drop
860926
:type username: str
861927
"""
862-
text = "DROP USER {0}".format(quote_ident(username), method="POST")
863-
self.query(text, method="POST")
928+
text = "DROP USER {0}".format(quote_ident(username))
929+
data = {"q": text}
930+
response = self.admin_request(data)
931+
932+
if response.status_code == 204:
933+
return "{0} is DROPPED".format(quote_ident(username))
934+
else:
935+
return "UNABLE TO DROP USER"
864936

865937
def set_user_password(self, username, password):
866938
"""Change the password of an existing user.
@@ -872,7 +944,13 @@ def set_user_password(self, username, password):
872944
"""
873945
text = "SET PASSWORD FOR {0} = {1}".format(
874946
quote_ident(username), quote_literal(password))
875-
self.query(text)
947+
data = {"q": text}
948+
response = self.admin_request(data)
949+
950+
if response.status_code == 204:
951+
return "{0} PASSWORD SET".format(quote_ident(username))
952+
else:
953+
return "UNABLE TO SET PASSWORD"
876954

877955
def delete_series(self, database=None, measurement=None, tags=None):
878956
"""Delete series from a database.
@@ -898,7 +976,16 @@ def delete_series(self, database=None, measurement=None, tags=None):
898976
tag_eq_list = ["{0}={1}".format(quote_ident(k), quote_literal(v))
899977
for k, v in tags.items()]
900978
query_str += ' WHERE ' + ' AND '.join(tag_eq_list)
901-
self.query(query_str, database=database, method="POST")
979+
980+
data = {"q": query_str}
981+
response = self.admin_request(data)
982+
983+
if response.status_code == 204:
984+
return "SERIES DROPPED SUCCESSFULLY"
985+
else:
986+
return "UNABLE TO DROP SERIES"
987+
988+
902989

903990
def grant_admin_privileges(self, username):
904991
"""Grant cluster administration privileges to a user.
@@ -910,7 +997,13 @@ def grant_admin_privileges(self, username):
910997
and manage users.
911998
"""
912999
text = "GRANT ALL PRIVILEGES TO {0}".format(quote_ident(username))
913-
self.query(text, method="POST")
1000+
data = {"q": text}
1001+
response = self.admin_request(data)
1002+
1003+
if response.status_code == 204:
1004+
return "GRANT ADMIN PRIVILEGES TO {0}".format(quote_ident(username))
1005+
else:
1006+
return "UNABLE TO GRANT ADMIN PRIVILEGES TO {0}".format(quote_ident(username))
9141007

9151008
def revoke_admin_privileges(self, username):
9161009
"""Revoke cluster administration privileges from a user.
@@ -922,7 +1015,13 @@ def revoke_admin_privileges(self, username):
9221015
and manage users.
9231016
"""
9241017
text = "REVOKE ALL PRIVILEGES FROM {0}".format(quote_ident(username))
925-
self.query(text, method="POST")
1018+
data = {"q": text}
1019+
response = self.admin_request(data)
1020+
1021+
if response.status_code == 204:
1022+
return "REVOKE ALL PRIVILEGES FROM {0}".format(quote_ident(username))
1023+
else:
1024+
return "UNABLE TO REVOKE ALL PRIVILEGES FROM {0}".format(quote_ident(username))
9261025

9271026
def grant_privilege(self, privilege, database, username):
9281027
"""Grant a privilege on a database to a user.
@@ -938,7 +1037,15 @@ def grant_privilege(self, privilege, database, username):
9381037
text = "GRANT {0} ON {1} TO {2}".format(privilege,
9391038
quote_ident(database),
9401039
quote_ident(username))
941-
self.query(text, method="POST")
1040+
data = {"q": text}
1041+
response = self.admin_request(data)
1042+
1043+
if response.status_code == 204:
1044+
return "GRANT {0} ON {1} TO {2} SUCCESSFUL".format(privilege,
1045+
quote_ident(database),
1046+
quote_ident(username))
1047+
else:
1048+
return "UNABLE TO GRANT PRIVILEGE"
9421049

9431050
def revoke_privilege(self, privilege, database, username):
9441051
"""Revoke a privilege on a database from a user.
@@ -954,7 +1061,17 @@ def revoke_privilege(self, privilege, database, username):
9541061
text = "REVOKE {0} ON {1} FROM {2}".format(privilege,
9551062
quote_ident(database),
9561063
quote_ident(username))
957-
self.query(text, method="POST")
1064+
data = {"q": text}
1065+
response = self.admin_request(data)
1066+
1067+
if response.status_code == 204:
1068+
return "REVOKE {0} ON {1} FROM {2} SUCCESSFULLY".format(
1069+
privilege,
1070+
quote_ident(database),
1071+
quote_ident(username)
1072+
)
1073+
else:
1074+
return "UNABLE TO REVOKE PRIVILEGE"
9581075

9591076
def get_list_privileges(self, username):
9601077
"""Get the list of all privileges granted to given user.
@@ -1054,7 +1171,13 @@ def create_continuous_query(self, name, select, database=None,
10541171
"CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
10551172
).format(quote_ident(name), quote_ident(database or self._database),
10561173
' RESAMPLE ' + resample_opts if resample_opts else '', select)
1057-
self.query(query_string)
1174+
data = {"q": query_string}
1175+
response = self.admin_request(data)
1176+
1177+
if response.status_code == 204:
1178+
return "CREATE CONTINUOUS QUERY SUCCESSFULLY"
1179+
else:
1180+
return "UNABLE TO CREATE CONTINUOUS QUERY"
10581181

10591182
def drop_continuous_query(self, name, database=None):
10601183
"""Drop an existing continuous query for a database.
@@ -1068,7 +1191,13 @@ def drop_continuous_query(self, name, database=None):
10681191
query_string = (
10691192
"DROP CONTINUOUS QUERY {0} ON {1}"
10701193
).format(quote_ident(name), quote_ident(database or self._database))
1071-
self.query(query_string)
1194+
data = {"q": query_string}
1195+
response = self.admin_request(data)
1196+
1197+
if response.status_code == 204:
1198+
return "DROPPED CONTINUOUS QUERY SUCCESSFULLY"
1199+
else:
1200+
return "UNABLE TO DROP CONTINUOUS QUERY"
10721201

10731202
def send_packet(self, packet, protocol='json', time_precision=None):
10741203
"""Send an UDP packet.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)
0