19
19
from six .moves import xrange
20
20
from six .moves .urllib .parse import urlparse
21
21
22
- from influxdb .line_protocol import make_lines , quote_ident , quote_literal
23
- from influxdb .resultset import ResultSet
22
+ from influxdb_relay .line_protocol import make_lines , quote_ident , quote_literal
23
+ from influxdb_relay .resultset import ResultSet
24
24
from .exceptions import InfluxDBClientError
25
25
from .exceptions import InfluxDBServerError
26
26
@@ -285,6 +285,7 @@ def request(self, url, method='GET', params=None, data=None,
285
285
verify = self ._verify_ssl ,
286
286
timeout = self ._timeout
287
287
)
288
+
288
289
break
289
290
except (requests .exceptions .ConnectionError ,
290
291
requests .exceptions .HTTPError ,
@@ -637,23 +638,57 @@ def get_list_database(self):
637
638
"""
638
639
return list (self .query ("SHOW DATABASES" ).get_points ())
639
640
641
+ def admin_request (self , data ):
642
+ url = "{0}/admin" .format (self ._baseurl )
643
+
644
+ # Try to send the requests more than once by default
645
+ retry = True
646
+ _try = 0
647
+ while retry :
648
+ try :
649
+ response = self ._session .request (
650
+ method = "POST" ,
651
+ url = url ,
652
+ data = data
653
+ )
654
+ break
655
+ except (requests .exceptions .ConnectionError , requests .exceptions .HTTPError , requests .exceptions .Timeout ):
656
+ _try += 1
657
+ if self ._retries != 0 :
658
+ retry = _try < self ._retries
659
+ else :
660
+ time .sleep ((2 ** _try ) * random .random () / 100.0 )
661
+
662
+ return response
663
+
640
664
def create_database (self , dbname ):
641
665
"""Create a new database in InfluxDB.
642
666
643
667
:param dbname: the name of the database to create
644
668
:type dbname: str
645
669
"""
646
- self .query ("CREATE DATABASE {0}" .format (quote_ident (dbname )),
647
- method = "POST" )
670
+ data = {'q' : "CREATE DATABASE {0}" .format (quote_ident (dbname ))}
671
+
672
+ response = self .admin_request (data )
673
+
674
+ if response .status_code == 204 :
675
+ return "Database {0} has been created" .format (quote_ident (dbname ))
676
+ else :
677
+ return "Unable to create DB"
648
678
649
679
def drop_database (self , dbname ):
650
680
"""Drop a database from InfluxDB.
651
681
652
682
:param dbname: the name of the database to drop
653
683
:type dbname: str
654
684
"""
655
- self .query ("DROP DATABASE {0}" .format (quote_ident (dbname )),
656
- method = "POST" )
685
+ data = {'q' : "DROP DATABASE {0}" .format (quote_ident (dbname ))}
686
+ response = self .admin_request (data )
687
+
688
+ if response .status_code == 204 :
689
+ return "Database {0} has been dropped" .format (quote_ident (dbname ))
690
+ else :
691
+ return "Unable to drop {0}" .format (quote_ident (dbname ))
657
692
658
693
def get_list_measurements (self ):
659
694
"""Get the list of measurements in InfluxDB.
@@ -679,8 +714,13 @@ def drop_measurement(self, measurement):
679
714
:param measurement: the name of the measurement to drop
680
715
:type measurement: str
681
716
"""
682
- self .query ("DROP MEASUREMENT {0}" .format (quote_ident (measurement )),
683
- method = "POST" )
717
+ data = {'q' : "DROP MEASUREMENT {0}" .format (quote_ident (measurement ))}
718
+ response = self .admin_request (data )
719
+
720
+ if response .status_code == 204 :
721
+ return "Measurement {0} has been dropped" .format (quote_ident (measurement ))
722
+ else :
723
+ return "Unable to drop {0} measurement" .format (quote_ident (measurement ))
684
724
685
725
def create_retention_policy (self , name , duration , replication ,
686
726
database = None ,
@@ -722,7 +762,13 @@ def create_retention_policy(self, name, duration, replication,
722
762
if default is True :
723
763
query_string += " DEFAULT"
724
764
725
- self .query (query_string , method = "POST" )
765
+ data = {'q' : query_string }
766
+ response = self .admin_request (data )
767
+
768
+ if response .status_code == 204 :
769
+ return "RETENTION POLICY {0} ON {1} CREATED" .format (quote_ident (name ), quote_ident (database or self ._database ))
770
+ else :
771
+ return "UNABLE TO CREATE RETENTION POLICY REQUESTED"
726
772
727
773
def alter_retention_policy (self , name , database = None ,
728
774
duration = None , replication = None ,
@@ -761,7 +807,7 @@ def alter_retention_policy(self, name, database=None,
761
807
query_string = (
762
808
"ALTER RETENTION POLICY {0} ON {1}"
763
809
).format (quote_ident (name ),
764
- quote_ident (database or self ._database ), shard_duration )
810
+ quote_ident (database or self ._database ))
765
811
if duration :
766
812
query_string += " DURATION {0}" .format (duration )
767
813
if shard_duration :
@@ -771,7 +817,14 @@ def alter_retention_policy(self, name, database=None,
771
817
if default is True :
772
818
query_string += " DEFAULT"
773
819
774
- self .query (query_string , method = "POST" )
820
+ data = {'q' : query_string }
821
+ response = self .admin_request (data )
822
+
823
+ if response .status_code == 204 :
824
+ return "ALTERED RETENTION POLICY {0} ON {1}" .format (quote_ident (name ),
825
+ quote_ident (database or self ._database ))
826
+ else :
827
+ return "UNABLE TO ALTER RETENTION POLICY"
775
828
776
829
def drop_retention_policy (self , name , database = None ):
777
830
"""Drop an existing retention policy for a database.
@@ -785,7 +838,14 @@ def drop_retention_policy(self, name, database=None):
785
838
query_string = (
786
839
"DROP RETENTION POLICY {0} ON {1}"
787
840
).format (quote_ident (name ), quote_ident (database or self ._database ))
788
- self .query (query_string , method = "POST" )
841
+
842
+ data = {'q' : query_string }
843
+ response = self .admin_request (data )
844
+
845
+ if response .status_code == 204 :
846
+ return "DROPPED RETENTION POLICY {0} ON {1}" .format (quote_ident (name ), quote_ident (database or self ._database ))
847
+ else :
848
+ return "UNABLE TO DROP RETENTION POLICY"
789
849
790
850
def get_list_retention_policies (self , database = None ):
791
851
"""Get the list of retention policies for a database.
@@ -851,16 +911,28 @@ def create_user(self, username, password, admin=False):
851
911
quote_ident (username ), quote_literal (password ))
852
912
if admin :
853
913
text += ' WITH ALL PRIVILEGES'
854
- self .query (text , method = "POST" )
914
+ data = {'q' : text }
915
+ response = self .admin_request (data )
916
+
917
+ if response .status_code == 204 :
918
+ return "{0} is CREATED" .format (quote_ident (username ))
919
+ else :
920
+ return "UNABLE TO CREATE USER"
855
921
856
922
def drop_user (self , username ):
857
923
"""Drop a user from InfluxDB.
858
924
859
925
:param username: the username to drop
860
926
:type username: str
861
927
"""
862
- text = "DROP USER {0}" .format (quote_ident (username ), method = "POST" )
863
- self .query (text , method = "POST" )
928
+ text = "DROP USER {0}" .format (quote_ident (username ))
929
+ data = {"q" : text }
930
+ response = self .admin_request (data )
931
+
932
+ if response .status_code == 204 :
933
+ return "{0} is DROPPED" .format (quote_ident (username ))
934
+ else :
935
+ return "UNABLE TO DROP USER"
864
936
865
937
def set_user_password (self , username , password ):
866
938
"""Change the password of an existing user.
@@ -872,7 +944,13 @@ def set_user_password(self, username, password):
872
944
"""
873
945
text = "SET PASSWORD FOR {0} = {1}" .format (
874
946
quote_ident (username ), quote_literal (password ))
875
- self .query (text )
947
+ data = {"q" : text }
948
+ response = self .admin_request (data )
949
+
950
+ if response .status_code == 204 :
951
+ return "{0} PASSWORD SET" .format (quote_ident (username ))
952
+ else :
953
+ return "UNABLE TO SET PASSWORD"
876
954
877
955
def delete_series (self , database = None , measurement = None , tags = None ):
878
956
"""Delete series from a database.
@@ -898,7 +976,16 @@ def delete_series(self, database=None, measurement=None, tags=None):
898
976
tag_eq_list = ["{0}={1}" .format (quote_ident (k ), quote_literal (v ))
899
977
for k , v in tags .items ()]
900
978
query_str += ' WHERE ' + ' AND ' .join (tag_eq_list )
901
- self .query (query_str , database = database , method = "POST" )
979
+
980
+ data = {"q" : query_str }
981
+ response = self .admin_request (data )
982
+
983
+ if response .status_code == 204 :
984
+ return "SERIES DROPPED SUCCESSFULLY"
985
+ else :
986
+ return "UNABLE TO DROP SERIES"
987
+
988
+
902
989
903
990
def grant_admin_privileges (self , username ):
904
991
"""Grant cluster administration privileges to a user.
@@ -910,7 +997,13 @@ def grant_admin_privileges(self, username):
910
997
and manage users.
911
998
"""
912
999
text = "GRANT ALL PRIVILEGES TO {0}" .format (quote_ident (username ))
913
- self .query (text , method = "POST" )
1000
+ data = {"q" : text }
1001
+ response = self .admin_request (data )
1002
+
1003
+ if response .status_code == 204 :
1004
+ return "GRANT ADMIN PRIVILEGES TO {0}" .format (quote_ident (username ))
1005
+ else :
1006
+ return "UNABLE TO GRANT ADMIN PRIVILEGES TO {0}" .format (quote_ident (username ))
914
1007
915
1008
def revoke_admin_privileges (self , username ):
916
1009
"""Revoke cluster administration privileges from a user.
@@ -922,7 +1015,13 @@ def revoke_admin_privileges(self, username):
922
1015
and manage users.
923
1016
"""
924
1017
text = "REVOKE ALL PRIVILEGES FROM {0}" .format (quote_ident (username ))
925
- self .query (text , method = "POST" )
1018
+ data = {"q" : text }
1019
+ response = self .admin_request (data )
1020
+
1021
+ if response .status_code == 204 :
1022
+ return "REVOKE ALL PRIVILEGES FROM {0}" .format (quote_ident (username ))
1023
+ else :
1024
+ return "UNABLE TO REVOKE ALL PRIVILEGES FROM {0}" .format (quote_ident (username ))
926
1025
927
1026
def grant_privilege (self , privilege , database , username ):
928
1027
"""Grant a privilege on a database to a user.
@@ -938,7 +1037,15 @@ def grant_privilege(self, privilege, database, username):
938
1037
text = "GRANT {0} ON {1} TO {2}" .format (privilege ,
939
1038
quote_ident (database ),
940
1039
quote_ident (username ))
941
- self .query (text , method = "POST" )
1040
+ data = {"q" : text }
1041
+ response = self .admin_request (data )
1042
+
1043
+ if response .status_code == 204 :
1044
+ return "GRANT {0} ON {1} TO {2} SUCCESSFUL" .format (privilege ,
1045
+ quote_ident (database ),
1046
+ quote_ident (username ))
1047
+ else :
1048
+ return "UNABLE TO GRANT PRIVILEGE"
942
1049
943
1050
def revoke_privilege (self
341A
span>, privilege , database , username ):
944
1051
"""Revoke a privilege on a database from a user.
@@ -954,7 +1061,17 @@ def revoke_privilege(self, privilege, database, username):
954
1061
text = "REVOKE {0} ON {1} FROM {2}" .format (privilege ,
955
1062
quote_ident (database ),
956
1063
quote_ident (username ))
957
- self .query (text , method = "POST" )
1064
+ data = {"q" : text }
1065
+ response = self .admin_request (data )
1066
+
1067
+ if response .status_code == 204 :
1068
+ return "REVOKE {0} ON {1} FROM {2} SUCCESSFULLY" .format (
1069
+ privilege ,
1070
+ quote_ident (database ),
1071
+ quote_ident (username )
1072
+ )
1073
+ else :
1074
+ return "UNABLE TO REVOKE PRIVILEGE"
958
1075
959
1076
def get_list_privileges (self , username ):
960
1077
"""Get the list of all privileges granted to given user.
@@ -1054,7 +1171,13 @@ def create_continuous_query(self, name, select, database=None,
1054
1171
"CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
1055
1172
).format (quote_ident (name ), quote_ident (database or self ._database ),
1056
1173
' RESAMPLE ' + resample_opts if resample_opts else '' , select )
1057
- self .query (query_string )
1174
+ data = {"q" : query_string }
1175
+ response = self .admin_request (data )
1176
+
1177
+ if response .status_code == 204 :
1178
+ return "CREATE CONTINUOUS QUERY SUCCESSFULLY"
1179
+ else :
1180
+ return "UNABLE TO CREATE CONTINUOUS QUERY"
1058
1181
1059
1182
def drop_continuous_query (self , name , database = None ):
1060
1183
"""Drop an existing continuous query for a database.
@@ -1068,7 +1191,13 @@ def drop_continuous_query(self, name, database=None):
1068
1191
query_string = (
1069
1192
"DROP CONTINUOUS QUERY {0} ON {1}"
1070
1193
).format (quote_ident (name ), quote_ident (database or self ._database ))
1071
- self .query (query_string )
1194
+ data = {"q" : query_string }
1195
+ response = self .admin_request (data )
1196
+
1197
+ if response .status_code == 204 :
1198
+ return "DROPPED CONTINUOUS QUERY SUCCESSFULLY"
1199
+ else :
1200
+ return "UNABLE TO DROP CONTINUOUS QUERY"
1072
1201
1073
1202
def send_packet (self , packet , protocol = 'json' , time_precision = None ):
1074
1203
"""Send an UDP packet.
0 commit comments