1
- # influxdb_client_3/__init__.py
2
-
1
+ import json
3
2
import pyarrow as pa
4
- from pyarrow .flight import FlightClient , Ticket , FlightCallOptions
5
- from influxdb_client import InfluxDBClient as _InfluxDBClient
6
- from influxdb_client import WriteOptions as WriteOptions
7
- from influxdb_client .client .write_api import WriteApi as _WriteApi
8
- from influxdb_client .client .write_api import SYNCHRONOUS , ASYNCHRONOUS , PointSettings
3
+ from influxdb_client import InfluxDBClient as _InfluxDBClient , WriteOptions , Point
4
+ from influxdb_client .client .write_api import WriteApi as _WriteApi , SYNCHRONOUS , ASYNCHRONOUS , PointSettings
9
5
from influxdb_client .domain .write_precision import WritePrecision
10
6
from influxdb_client .client .exceptions import InfluxDBError
11
- from influxdb_client import Point
12
- import json
13
-
7
+ from pyarrow .flight import FlightClient , Ticket , FlightCallOptions
14
8
from influxdb_client_3 .read_file import upload_file
15
9
16
10
@@ -21,153 +15,137 @@ def write_client_options(**kwargs):
21
15
def flight_client_options (** kwargs ):
22
16
return kwargs # You can replace this with a specific data structure if needed
23
17
24
-
25
18
class InfluxDBClient3 :
26
19
def __init__ (
27
20
self ,
28
21
host = None ,
29
22
org = None ,
30
23
database = None ,
31
24
token = None ,
32
- _write_client_options = None ,
33
- _flight_client_options = None ,
25
+ write_client_options = None ,
26
+ flight_client_options = None ,
34
27
** kwargs ):
35
28
"""
36
- This class provides an interface for interacting with an InfluxDB server, simplifying common operations such as writing, querying.
37
- * host (str, optional): The hostname or IP address of the InfluxDB server. Defaults to None.
38
- * org (str, optional): The InfluxDB organization name to be used for operations. Defaults to None.
39
- * database (str, optional): The database to be used for InfluxDB operations. Defaults to None.
40
- * token (str, optional): The authentication token for accessing the InfluxDB server. Defaults to None.
41
- * write_options (ANY, optional): Exposes InfuxDB WriteAPI options.
42
- * **kwargs: Additional arguments to be passed to the InfluxDB Client.
29
+ Initialize an InfluxDB client.
30
+
31
+ :param host: The hostname or IP address of the InfluxDB server.
32
+ :type host: str
33
+ :param org: The InfluxDB organization name for operations.
34
+ :type org: str
35
+ :param database: The database for InfluxDB operations.
36
+ :type database: str
37
+ :param token: The authentication token for accessing the InfluxDB server.
38
+ :type token: str
39
+ :param write_client_options: Options for the WriteAPI.
40
+ :type write_client_options: dict
41
+ :param flight_client_options: Options for the FlightClient.
42
+ :type flight_client_options: dict
43
+ :param kwargs: Additional arguments for the InfluxDB Client.
43
44
"""
44
45
self ._org = org
45
46
self ._database = database
46
- self ._write_client_options = _write_client_options if _write_client_options is not None else write_client_options ( write_options = SYNCHRONOUS )
47
+ self ._write_client_options = write_client_options or { ' write_options' : SYNCHRONOUS }
47
48
self ._client = _InfluxDBClient (
48
49
url = f"https://{ host } " ,
49
50
token = token ,
50
51
org = self ._org ,
51
52
** kwargs )
52
53
53
- self ._write_api = _WriteApi (
54
- self ._client , ** self ._write_client_options )
55
-
56
- self ._flight_client_options = _flight_client_options if _flight_client_options is not None else {}
57
- self ._flight_client = FlightClient (
58
- f"grpc+tls://{ host } :443" ,
59
- ** self ._flight_client_options )
54
+ self ._write_api = _WriteApi (self ._client , ** self ._write_client_options )
55
+ self ._flight_client_options = flight_client_options or {}
56
+ self ._flight_client = FlightClient (f"grpc+tls://{ host } :443" , ** self ._flight_client_options )
60
57
61
- # create an authorization header
62
- self ._options = FlightCallOptions (
63
- headers = [(b"authorization" , f"Bearer { token } " .encode ('utf-8' ))])
58
+ # Create an authorization header
59
+ self ._options = FlightCallOptions (headers = [(b"authorization" , f"Bearer { token } " .encode ('utf-8' ))])
64
60
65
61
def write (self , record = None , ** kwargs ):
66
62
"""
67
63
Write data to InfluxDB.
68
64
69
- :type database: str
70
65
:param record: The data point(s) to write.
71
66
:type record: Point or list of Point objects
72
67
:param kwargs: Additional arguments to pass to the write API.
73
68
"""
74
69
try :
75
- self ._write_api .write (
76
- bucket = self ._database , record = record , ** kwargs )
77
- except Exception as e :
78
- print (e )
70
+ self ._write_api .write (bucket = self ._database , record = record , ** kwargs )
71
+ except InfluxDBError as e :
72
+ print (f"InfluxDB Error: { e } " )
79
73
80
- def write_file (
81
- self ,
82
- file ,
83
- measurement_name = None ,
84
- tag_columns = [],
85
- timestamp_column = 'time' ,
86
- ** kwargs ):
74
+ def write_file (self , file , measurement_name = None , tag_columns = None , timestamp_column = 'time' , ** kwargs ):
87
75
"""
88
- Write data from a file to InfluxDB.
76
+ Write data from a file to InfluxDB.
89
77
90
78
:param file: The file to write.
79
+ :type file: str
80
+ :param measurement_name: The name of the measurement.
81
+ :type measurement_name: str
82
+ :param tag_columns: Tag columns.
83
+ :type tag_columns: list
84
+ :param timestamp_column: Timestamp column name. Defaults to 'time'.
85
+ :type timestamp_column: str
91
86
:param kwargs: Additional arguments to pass to the write API.
92
87
"""
93
88
try :
94
- rf = upload_file (file )
95
- Table = rf .load_file ()
96
-
97
- if isinstance (Table , pa .Table ):
98
- df = Table .to_pandas ()
99
- else :
100
- df = Table
101
- print (df )
102
-
103
- measurement_column = None
104
-
105
- if measurement_name is None :
106
- if 'measurement' in df .columns :
107
- measurement_column = 'measurement'
108
- elif 'iox::measurement' in df .columns :
109
- measurement_column = 'iox::measurement'
110
-
111
- if measurement_column is not None :
112
- unique_measurements = df [measurement_column ].unique ()
113
- for measurement in unique_measurements :
114
- df_measurement = df [df [measurement_column ] == measurement ]
115
- df_measurement = df_measurement .drop (columns = [measurement_column ])
116
- print (df_measurement )
117
- self ._write_api .write (bucket = self ._database , record = df_measurement ,
118
- data_frame_measurement_name = measurement ,
119
- data_frame_tag_columns = tag_columns ,
120
- data_frame_timestamp_column = timestamp_column )
121
- else :
122
- print ("'measurement' column not found in the dataframe." )
123
- else :
124
- if 'measurement' in df .columns :
125
- df = df .drop (columns = ['measurement' ])
126
- self ._write_api .write (bucket = self ._database , record = df ,
127
- data_frame_measurement_name = measurement_name ,
128
- data_frame_tag_columns = tag_columns ,
129
- data_frame_timestamp_column = timestamp_column )
89
+ table = upload_file (file ).load_file ()
90
+ df = table .to_pandas () if isinstance (table , pa .Table ) else table
91
+ self ._process_dataframe (df , measurement_name , tag_columns or [], timestamp_column )
130
92
except Exception as e :
131
- print (e )
93
+ print (f"Error writing file: { e } " )
94
+
95
+ def _process_dataframe (self , df , measurement_name , tag_columns , timestamp_column ):
96
+ # This function is factored out for clarity.
97
+ # It processes a DataFrame before writing to InfluxDB.
98
+
99
+ measurement_column = None
100
+ if measurement_name is None :
101
+ measurement_column = next ((col for col in ['measurement' , 'iox::measurement' ] if col in df .columns ), None )
102
+ if measurement_column :
103
+ for measurement in df [measurement_column ].unique ():
104
+ df_measurement = df [df [measurement_column ] == measurement ].drop (columns = [measurement_column ])
105
+ self ._write_api .write (bucket = self ._database , record = df_measurement ,
106
+ data_frame_measurement_name = measurement ,
107
+ data_frame_tag_columns = tag_columns ,
108
+ data_frame_timestamp_column = timestamp_column )
109
+ else :
110
+ print ("'measurement' column not found in the dataframe." )
111
+ else :
112
+ df = df .drop (columns = ['measurement' ], errors = 'ignore' )
113
+ self ._write_api .write (bucket = self ._database , record = df ,
114
+ data_frame_measurement_name = measurement_name ,
115
+ data_frame_tag_columns = tag_columns ,
116
+ data_frame_timestamp_column = timestamp_column )
132
117
133
118
def query (self , query , language = "sql" , mode = "all" ):
134
- # create a flight client pointing to the InfluxDB
135
- # create a ticket
136
- ticket_data = {
137
- "database" : self ._database ,
138
- "sql_query" : query ,
139
- "query_type" : language }
140
-
141
- ticket_bytes = json .dumps (ticket_data )
142
- ticket = Ticket (ticket_bytes )
143
-
144
- # execute the query and return all the data
119
+ """
120
+ Query data from InfluxDB.
121
+
122
+ :param query: The query string.
123
+ :type query: str
124
+ :param language: The query language (default is "sql").
125
+ :type language: str
126
+ :param mode: The mode of fetching data (all, pandas, chunk, reader, schema).
127
+ :type mode: str
128
+ :return: The queried data.
129
+ """
130
+ ticket_data = {"database" : self ._database , "sql_query" : query , "query_type" : language }
131
+ ticket = Ticket (json .dumps (ticket_data ).encode ('utf-8' ))
145
132
flight_reader = self ._flight_client .do_get (ticket , self ._options )
146
133
147
- if mode == "all" :
148
- # use read_all() to get all of the data as an Arrow table
149
- return flight_reader .read_all ()
150
- # use read_all() to get all of the data as an Arrow table
151
- elif mode == "pandas" :
152
- return flight_reader .read_pandas ()
153
- elif mode == "chunk" :
154
- return flight_reader
155
- elif mode == "reader" :
156
- return flight_reader .to_reader ()
157
- elif mode == "schema" :
158
- return flight_reader .schema
159
- else :
160
- return flight_reader .read_all ()
134
+ mode_func = {
135
+ "all" : flight_reader .read_all ,
136
+ "pandas" : flight_reader .read_pandas ,
137
+ "chunk" : lambda : flight_reader ,
138
+ "reader" : flight_reader .to_reader ,
139
+ "schema" : lambda : flight_reader .schema
140
+ }.get (mode , flight_reader .read_all )
141
+
142
+ return mode_func () if callable (mode_func ) else mode_func
161
143
162
144
def close (self ):
163
- # Clean up resources here.
164
- # Call close method of _write_api and _flight_client, if they exist.
165
- if hasattr (self ._write_api , 'close' ):
166
- self ._write_api .close ()
167
- if hasattr (self ._flight_client , 'close' ):
168
- self ._flight_client .close ()
169
- if hasattr (self ._client , 'close' ):
170
- self ._client .close ()
145
+ """Close the client and clean up resources."""
146
+ self ._write_api .close ()
147
+ self ._flight_client .close ()
148
+ self ._client .close ()
171
149
172
150
def __enter__ (self ):
173
151
return self
@@ -182,7 +160,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
182
160
"PointSettings" ,
183
161
"SYNCHRONOUS" ,
184
162
"ASYNCHRONOUS" ,
185
- "write_client_options" ,
186
163
"WritePrecision" ,
187
- "flight_client_options" ,
188
- "WriteOptions" ]
164
+ "WriteOptions" ,
165
+ "write_client_options" ,
166
+ "flight_client_options"
167
+ ]
0 commit comments