8000 cleaned up class · InfluxCommunity/influxdb3-python@4b4a71d · GitHub
[go: up one dir, main page]

Skip to content

Commit 4b4a71d

Browse files
author
Jay Clifford
committed
cleaned up class
1 parent 33aeba9 commit 4b4a71d

File tree

3 files changed

+98
-119
lines changed

3 files changed

+98
-119
lines changed

Examples/batching-example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def retry(self, conf, data: str, exception: InfluxDBError):
5959
with InfluxDBClient3.InfluxDBClient3(token=token,
6060
host=url,
6161
org=org,
62-
database=database, enable_gzip=True, _write_client_options=wco) as _client:
62+
database=database, enable_gzip=True, write_client_options=wco) as _client:
6363

6464

6565
# Creating iterator for one hour worth of data (6 sensor readings per

Examples/flight_options_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
host="b0c7cce5-8dbc-428e-98c6-7f996fb96467.a.influxdb.io",
1515
org="6a841c0c08328fb1",
1616
database="flightdemo",
17-
_flight_client_options=flight_client_options(
17+
flight_client_options=flight_client_options(
1818
tls_root_certs=cert))
1919

2020

influxdb_client_3/__init__.py

Lines changed: 96 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
1-
# influxdb_client_3/__init__.py
2-
1+
import json
32
import pyarrow as pa
4-
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
5-
from influxdb_client import InfluxDBClient as _InfluxDBClient
6-
from influxdb_client import WriteOptions as WriteOptions
7-
from influxdb_client.client.write_api import WriteApi as _WriteApi
8-
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, PointSettings
3+
from influxdb_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
4+
from influxdb_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, PointSettings
95
from influxdb_client.domain.write_precision import WritePrecision
106
from influxdb_client.client.exceptions import InfluxDBError
11-
from influxdb_client import Point
12-
import json
13-
7+
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
148
from influxdb_client_3.read_file import upload_file
159

1610

@@ -21,153 +15,137 @@ def write_client_options(**kwargs):
2115
def flight_client_options(**kwargs):
2216
return kwargs # You can replace this with a specific data structure if needed
2317

24-
2518
class InfluxDBClient3:
2619
def __init__(
2720
self,
2821
host=None,
2922
org=None,
3023
database=None,
3124
token=None,
32-
_write_client_options=None,
33-
_flight_client_options=None,
25+
write_client_options=None,
26+
flight_client_options=None,
3427
**kwargs):
3528
"""
36-
This class provides an interface for interacting with an InfluxDB server, simplifying common operations such as writing, querying.
37-
* host (str, optional): The hostname or IP address of the InfluxDB server. Defaults to None.
38-
* org (str, optional): The InfluxDB organization name to be used for operations. Defaults to None.
39-
* database (str, optional): The database to be used for InfluxDB operations. Defaults to None.
40-
* token (str, optional): The authentication token for accessing the InfluxDB server. Defaults to None.
41-
* write_options (ANY, optional): Exposes InfuxDB WriteAPI options.
42-
* **kwargs: Additional arguments to be passed to the InfluxDB Client.
29+
Initialize an InfluxDB client.
30+
31+
:param host: The hostname or IP address of the InfluxDB server.
32+
:type host: str
33+
:param org: The InfluxDB organization name for operations.
34+
:type org: str
35+
:param database: The database for InfluxDB operations.
36+
:type database: str
37+
:param token: The authentication token for accessing the InfluxDB server.
38+
:type token: str
39+
:param write_client_options: Options for the WriteAPI.
40+
:type write_client_options: dict
41+
:param flight_client_options: Options for the FlightClient.
42+
:type flight_client_options: dict
43+
:param kwargs: Additional arguments for the InfluxDB Client.
4344
"""
4445
self._org = org
4546
self._database = database
46-
self._write_client_options = _write_client_options if _write_client_options is not None else write_client_options(write_options=SYNCHRONOUS)
47+
self._write_client_options = write_client_options or {'write_options': SYNCHRONOUS}
4748
self._client = _InfluxDBClient(
4849
url=f"https://{host}",
4950
token=token,
5051
org=self._org,
5152
**kwargs)
5253

53-
self._write_api = _WriteApi(
54-
self._client, **self._write_client_options)
55-
56-
self._flight_client_options = _flight_client_options if _flight_client_options is not None else {}
57-
self._flight_client = FlightClient(
58-
f"grpc+tls://{host}:443",
59-
**self._flight_client_options)
54+
self._write_api = _WriteApi(self._client, **self._write_client_options)
55+
self._flight_client_options = flight_client_options or {}
56+
self._flight_client = FlightClient(f"grpc+tls://{host}:443", **self._flight_client_options)
6057

61-
# create an authorization header
62-
self._options = FlightCallOptions(
63-
headers=[(b"authorization", f"Bearer {token}".encode('utf-8'))])
58+
# Create an authorization header
59+
self._options = FlightCallOptions(headers=[(b"authorization", f"Bearer {token}".encode('utf-8'))])
6460

6561
def write(self, record=None, **kwargs):
6662
"""
6763
Write data to InfluxDB.
6864
69-
:type database: str
7065
:param record: The data point(s) to write.
7166
:type record: Point or list of Point objects
7267
:param kwargs: Additional arguments to pass to the write API.
7368
"""
7469
try:
75-
self._write_api.write(
76-
bucket=self._database, record=record, **kwargs)
77-
except Exception as e:
78-
print(e)
70+
self._write_api.write(bucket=self._database, record=record, **kwargs)
71+
except InfluxDBError as e:
72+
print(f"InfluxDB Error: {e}")
7973

80-
def write_file(
81-
self,
82-
file,
83-
measurement_name=None,
84-
tag_columns=[],
85-
timestamp_column='time',
86-
**kwargs):
74+
def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', **kwargs):
8775
"""
88-
Write data from a file to InfluxDB.
76+
Write data from a file to InfluxDB.
8977
9078
:param file: The file to write.
79+
:type file: str
80+
:param measurement_name: The name of the measurement.
81+
:type measurement_name: str
82+
:param tag_columns: Tag columns.
83+
:type tag_columns: list
84+
:param timestamp_column: Timestamp column name. Defaults to 'time'.
85+
:type timestamp_column: str
9186
:param kwargs: Additional arguments to pass to the write API.
9287
"""
9388
try:
94-
rf = upload_file(file)
95-
Table = rf.load_file()
96-
97-
if isinstance(Table, pa.Table):
98-
df = Table.to_pandas()
99-
else:
100-
df = Table
101-
print(df)
102-
103-
measurement_column = None
104-
105-
if measurement_name is None:
106-
if 'measurement' in df.columns:
107-
measurement_column = 'measurement'
108-
elif 'iox::measurement' in df.columns:
109-
measurement_column = 'iox::measurement'
110-
111-
if measurement_column is not None:
112-
unique_measurements = df[measurement_column].unique()
113-
for measurement in unique_measurements:
114-
df_measurement = df[df[measurement_column] == measurement]
115-
df_measurement = df_measurement.drop(columns=[measurement_column])
116-
print(df_measurement)
117-
self._write_api.write(bucket=self._database, record=df_measurement,
118-
data_frame_measurement_name=measurement,
119-
data_frame_tag_columns=tag_columns,
120-
data_frame_timestamp_column=timestamp_column)
121-
else:
122-
print("'measurement' column not found in the dataframe.")
123-
else:
124-
if 'measurement' in df.columns:
125-
df = df.drop(columns=['measurement'])
126-
self._write_api.write(bucket=self._database, record=df,
127-
data_frame_measurement_name=measurement_name,
128-
data_frame_tag_columns=tag_columns,
129-
data_frame_timestamp_column=timestamp_column)
89+
table = upload_file(file).load_file()
90+
df = table.to_pandas() if isinstance(table, pa.Table) else table
91+
self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column)
13092
except Exception as e:
131-
print(e)
93+
print(f"Error writing file: {e}")
94+
95+
def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column):
96+
# This function is factored out for clarity.
97+
# It processes a DataFrame before writing to InfluxDB.
98+
99+
measurement_column = None
100+
if measurement_name is None:
101+
measurement_column = next((col for col in ['measurement', 'iox::measurement'] if col in df.columns), None)
102+
if measurement_column:
103+
for measurement in df[measurement_column].unique():
104+
df_measurement = df[df[measurement_column] == measurement].drop(columns=[measurement_column])
105+
self._write_api.write(bucket=self._database, record=df_measurement,
106+
data_frame_measurement_name=measurement,
107+
data_frame_tag_columns=tag_columns,
108+
data_frame_timestamp_column=timestamp_column)
109+
else:
110+
print("'measurement' column not found in the dataframe.")
111+
else:
112+
df = df.drop(columns=['measurement'], errors='ignore')
113+
self._write_api.write(bucket=self._database, record=df,
114+
data_frame_measurement_name=measurement_name,
115+
data_frame_tag_columns=tag_columns,
116+
data_frame_timestamp_column=timestamp_column)
132117

133118
def query(self, query, language="sql", mode="all"):
134-
# create a flight client pointing to the InfluxDB
135-
# create a ticket
136-
ticket_data = {
137-
"database": self._database,
138-
"sql_query": query,
139-
"query_type": language}
140-
141-
ticket_bytes = json.dumps(ticket_data)
142-
ticket = Ticket(ticket_bytes)
143-
144-
# execute the query and return all the data
119+
"""
120+
Query data from InfluxDB.
121+
122+
:param query: The query string.
123+
:type query: str
124+
:param language: The query language (default is "sql").
125+
:type language: str
126+
:param mode: The mode of fetching data (all, pandas, chunk, reader, schema).
127+
:type mode: str
128+
:return: The queried data.
129+
"""
130+
ticket_data = {"database": self._database, "sql_query": query, "query_type": language}
131+
ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))
145132
flight_reader = self._flight_client.do_get(ticket, self._options)
146133

147-
if mode == "all":
148-
# use read_all() to get all of the data as an Arrow table
149-
return flight_reader.read_all()
150-
# use read_all() to get all of the data as an Arrow table
151-
elif mode == "pandas":
152-
return flight_reader.read_pandas()
153-
elif mode == "chunk":
154-
return flight_reader
155-
elif mode == "reader":
156-
return flight_reader.to_reader()
157-
elif mode == "schema":
158-
return flight_reader.schema
159-
else:
160-
return flight_reader.read_all()
134+
mode_func = {
135+
"all": flight_reader.read_all,
136+
"pandas": flight_reader.read_pandas,
137+
"chunk": lambda: flight_reader,
138+
"reader": flight_reader.to_reader,
139+
"schema": lambda: flight_reader.schema
140+
}.get(mode, flight_reader.read_all)
141+
142+
return mode_func() if callable(mode_func) else mode_func
161143

162144
def close(self):
163-
# Clean up resources here.
164-
# Call close method of _write_api and _flight_client, if they exist.
165-
if hasattr(self._write_api, 'close'):
166-
self._write_api.close()
167-
if hasattr(self._flight_client, 'close'):
168-
self._flight_client.close()
169-
if hasattr(self._client, 'close'):
170-
self._client.close()
145+
"""Close the client and clean up resources."""
146+
self._write_api.close()
147+
self._flight_client.close()
148+
self._client.close()
171149

172150
def __enter__(self):
173151
return self
@@ -182,7 +160,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
182160
"PointSettings",
183161
"SYNCHRONOUS",
184162
"ASYNCHRONOUS",
185-
"write_client_options",
186163
"WritePrecision",
187-
"flight_client_options",
188-
"WriteOptions"]
164+
"WriteOptions",
165+
"write_client_options",
166+
"flight_client_options"
167+
]

0 commit comments

Comments
 (0)
0