@@ -18,6 +18,7 @@ def default_client_options(**kwargs):
18
18
def flight_client_options (** kwargs ):
19
19
return kwargs # You can replace this with a specific data structure if needed
20
20
21
+
21
22
class InfluxDBClient3 :
22
23
def __init__ (
23
24
self ,
@@ -47,6 +48,7 @@ def __init__(
47
48
"""
48
49
self ._org = org
49
50
self ._database = database
51
+ self ._token = token
50
52
self ._write_client_options = write_client_options if write_client_options is not None else default_client_options (write_options = SYNCHRONOUS )
51
53
52
54
# Extracting the hostname from URL if provided
@@ -55,32 +57,34 @@ def __init__(
55
57
56
58
self ._client = _InfluxDBClient (
57
59
url = f"https://{ host } " ,
58
- token = token ,
60
+ token = self . _token ,
59
61
org = self ._org ,
60
62
** kwargs )
61
63
62
64
self ._write_api = _WriteApi (influxdb_client = self ._client , ** self ._write_client_options )
63
65
self ._flight_client_options = flight_client_options or {}
64
66
self ._flight_client = FlightClient (f"grpc+tls://{ host } :443" , ** self ._flight_client_options )
65
67
66
- # Create an authorization header
67
- self ._options = FlightCallOptions (headers = [(b"authorization" , f"Bearer { token } " .encode ('utf-8' ))])
68
68
69
- def write (self , record = None , ** kwargs ):
69
+
70
+ def write (self , record = None , database = None ,** kwargs ):
70
71
"""
71
72
Write data to InfluxDB.
72
73
73
74
:param record: The data point(s) to write.
74
75
:type record: Point or list of Point objects
75
76
:param kwargs: Additional arguments to pass to the write API.
76
77
"""
78
+ if database is None :
79
+ database = self ._database
80
+
77
81
try :
78
- self ._write_api .write (bucket = self . _database , record = record , ** kwargs )
82
+ self ._write_api .write (bucket = database , record = record , ** kwargs )
79
83
except InfluxDBError as e :
80
84
raise e
81
85
82
86
83
- def write_file (self , file , measurement_name = None , tag_columns = None , timestamp_column = 'time' , ** kwargs ):
87
+ def write_file (self , file , measurement_name = None , tag_columns = None , timestamp_column = 'time' , database = None , ** kwargs ):
84
88
"""
85
89
Write data from a file to InfluxDB.
86
90
@@ -94,15 +98,18 @@ def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_co
94
98
:type timestamp_column: str
95
99
:param kwargs: Additional arguments to pass to the write API.
96
100
"""
101
+ if database is None :
102
+ database = self ._database
103
+
97
104
try :
98
105
table = upload_file (file ).load_file ()
99
106
df = table .to_pandas () if isinstance (table , pa .Table ) else table
100
- self ._process_dataframe (df , measurement_name , tag_columns or [], timestamp_column )
107
+ self ._process_dataframe (df , measurement_name , tag_columns or [], timestamp_column , database = database , ** kwargs )
101
108
except Exception as e :
102
109
raise e
103
110
104
111
105
- def _process_dataframe (self , df , measurement_name , tag_columns , timestamp_column ):
112
+ def _process_dataframe (self , df , measurement_name , tag_columns , timestamp_column , database , ** kwargs ):
106
113
# This function is factored out for clarity.
107
114
# It processes a DataFrame before writing to InfluxDB.
108
115
@@ -120,12 +127,12 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column
120
127
print ("'measurement' column not found in the dataframe." )
121
128
else :
122
129
df = df .drop (columns = ['measurement' ], errors = 'ignore' )
123
- self ._write_api .write (bucket = self . _database , record = df ,
130
+ self ._write_api .write (bucket = database , record = df ,
124
131
data_frame_measurement_name = measurement_name ,
125
132
data_frame_tag_columns = tag_columns ,
126
- data_frame_timestamp_column = timestamp_column )
133
+ data_frame_timestamp_column = timestamp_column , ** kwargs )
127
134
128
- def query (self , query , language = "sql" , mode = "all" ):
135
+ def query (self , query , language = "sql" , mode = "all" , database = None , ** kwargs ):
129
136
"""
130
137
Query data from InfluxDB.
131
138
@@ -137,10 +144,15 @@ def query(self, query, language="sql", mode="all"):
137
144
:type mode: str
138
145
:return: The queried data.
139
146
"""
147
+ if database is None :
148
+ database = self ._database
149
+
140
150
try :
141
- ticket_data = {"database" : self ._database , "sql_query" : query , "query_type" : language }
151
+ # Create an authorization header
152
+ _options = FlightCallOptions (headers = [(b"authorization" , f"Bearer { self ._token } " .encode ('utf-8' ))], ** kwargs )
153
+ ticket_data = {"database" : database , "sql_query" : query , "query_type" : language }
142
154
ticket = Ticket (json .dumps (ticket_data ).encode ('utf-8' ))
143
- flight_reader = self ._flight_client .do_get (ticket , self . _options )
155
+ flight_reader = self ._flight_client .do_get (ticket , _options )
144
156
145
157
mode_func = {
146
158
"all" : flight_reader .read_all ,
0 commit comments