8000 Merge pull request #37 from InfluxCommunity/36-expose-flightcalloptio… · InfluxCommunity/influxdb3-python@6dad255 · GitHub
[go: up one dir, main page]

Skip to content

Commit 6dad255

Browse files
Merge pull request #37 from InfluxCommunity/36-expose-flightcalloptions-in-influxdbclient3-constructor
added FlightCallOptions and database parameters
2 parents 38e9fcb + 067e649 commit 6dad255

File tree

2 files changed

+134
-13
lines changed

2 files changed

+134
-13
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options
2+
import pandas as pd
3+
import numpy as np
4+
import random
5+
6+
7+
now = pd.Timestamp.now(tz='UTC').floor('ms')
8+
two_days_ago = now
9+
10+
11+
class BatchingCallback(object):
12+
13+
def success(self, conf, data: str):
14+
print(f"Written batch: {conf}, data: {data}")
15+
16+
def error(self, conf, data: str, exception: InfluxDBError):
17+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
18+
19+
def retry(self, conf, data: str, exception: InfluxDBError):
20+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
21+
22+
callback = BatchingCallback()
23+
24+
25+
write_options = WriteOptions(batch_size=100,
26+
flush_interval=10_000,
27+
jitter_interval=2_000,
28+
retry_interval=5_000,
29+
max_retries=5,
30+
max_retry_delay=30_000,
31+
exponential_base=2)
32+
33+
wco = write_client_options(success_callback=callback.success,
34+
error_callback=callback.error,
35+
retry_callback=callback.retry,
36+
WriteOptions=write_options
37+
)
38+
39+
client = InfluxDBClient3(
40+
token="",
41+
host="eu-central-1-1.aws.cloud2.influxdata.com",
42+
org="6a841c0c08328fb1", enable_gzip=True, write_client_options=wco)
43+
44+
now = pd.Timestamp.now(tz='UTC').floor('ms')
45+
46+
# Lists of possible trainers
47+
trainers = ["ash", "brock", "misty", "gary", "jessie", "james"]
48+
49+
# Read the CSV into a DataFrame
50+
pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv")
51+
52+
# Creating an empty list to store the data
53+
data = []
54+
55+
# Dictionary to keep track of the number of times each trainer has caught each Pokémon
56+
trainer_pokemon_counts = {}
57+
58+
# Number of entries we want to create
59+
num_entries = 1000
60+
61+
# Generating random data
62+
for i in range(num_entries):
63+
trainer = random.choice(trainers)
64+
65+
# Randomly select a row from pokemon_df
66+
random_pokemon = pokemon_df.sample().iloc[0]
67+
caught = random_pokemon['Name']
68+
69+
# Count the number of times this trainer has caught this Pokémon
70+
if (trainer, caught) in trainer_pokemon_counts:
71+
trainer_pokemon_counts[(trainer, caught)] += 1
72+
else:
73+
trainer_pokemon_counts[(trainer, caught)] = 1
74+
75+
# Get the number for this combination of trainer and Pokémon
76+
num = trainer_pokemon_counts[(trainer, caught)]
77+
78+
entry = {
79+
"trainer": trainer,
80+
"id": f"{0000 + random_pokemon['#']:04d}",
81+
"num": str(num),
82+
"caught": caught,
83+
"level": random.randint(5, 20),
84+
"attack": random_pokemon['Attack'],
85+
"defense": random_pokemon['Defense'],
86+
"hp": random_pokemon['HP'],
87+
"speed": random_pokemon['Speed'],
88+
"type1": random_pokemon['Type 1'],
89+
"type2": random_pokemon['Type 2'],
90+
"timestamp": two_days_ago
91+
}
92+
data.append(entry)
93+
94+
# Convert the list of dictionaries to a DataFrame
95+
caught_pokemon_df = pd.DataFrame(data).set_index('timestamp')
96+
97+
# Print the DataFrame
98+
#print(caught_pokemon_df)
99+
100+
101+
102+
103+
104+
# Query
105+
try:
106+
table = client.query(query='''SELECT * FROM caught WHERE time >= now() - 30m''', database='pokemon-codex', timeout=90.0, language='sql', mode='pandas')
107+
print(table)
108+
except Exception as e:
109+
print(f"Error querying points: {e}")

influxdb_client_3/__init__.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def default_client_options(**kwargs):
1818
def flight_client_options(**kwargs):
1919
return kwargs # You can replace this with a specific data structure if needed
2020

21+
2122
class InfluxDBClient3:
2223
def __init__(
2324
self,
@@ -47,6 +48,7 @@ def __init__(
4748
"""
4849
self._org = org
4950
self._database = database
51+
self._token = token
5052
self._write_client_options = write_client_options if write_client_options is not None else default_client_options(write_options=SYNCHRONOUS)
5153

5254
# Extracting the hostname from URL if provided
@@ -55,32 +57,34 @@ def __init__(
5557

5658
self._client = _InfluxDBClient(
5759
url=f"https://{host}",
58-
token=token,
60+
token=self._token,
5961
org=self._org,
6062
**kwargs)
6163

6264
self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
6365
self._flight_client_options = flight_client_options or {}
6466
self._flight_client = FlightClient(f"grpc+tls://{host}:443", **self._flight_client_options)
6567

66-
# Create an authorization header
67-
self._options = FlightCallOptions(headers=[(b"authorization", f"Bearer {token}".encode('utf-8'))])
6868

69-
def write(self, record=None, **kwargs):
69+
70+
def write(self, record=None, database=None ,**kwargs):
7071
"""
7172
Write data to InfluxDB.
7273
7374
:param record: The data point(s) to write.
7475
:type record: Point or list of Point objects
7576
:param kwargs: Additional arguments to pass to the write API.
7677
"""
78+
if database is None:
79+
database = self._database
80+
7781
try:
78-
self._write_api.write(bucket=self._database, record=record, **kwargs)
82+
self._write_api.write(bucket=database, record=record, **kwargs)
7983
except InfluxDBError as e:
8084
raise e
8185

8286

83-
def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', **kwargs):
87+
def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None, **kwargs):
8488
"""
8589
Write data from a file to InfluxDB.
8690
@@ -94,15 +98,18 @@ def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_co
9498
:type timestamp_column: str
9599
:param kwargs: Additional arguments to pass to the write API.
96100
"""
101+
if database is None:
102+
database = self._database
103+
97104
try:
98105
table = upload_file(file).load_file()
99106
df = table.to_pandas() if isinstance(table, pa.Table) else table
100-
self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column)
107+
self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column, database=database, **kwargs)
101108
except Exception as e:
102109
raise e
103110

104111

105-
def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column):
112+
def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column, database, **kwargs):
106113
# This function is factored out for clarity.
107114
# It processes a DataFrame before writing to InfluxDB.
108115

@@ -120,12 +127,12 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column
120127
print("'measurement' column not found in the dataframe.")
121128
else:
122129
df = df.drop(columns=['measurement'], errors='ignore')
123-
self._write_api.write(bucket=self._database, record=df,
130+
self._write_api.write(bucket=database, record=df,
124131
data_frame_measurement_name=measurement_name,
125132
data_frame_tag_columns=tag_columns,
126-
data_frame_timestamp_column=timestamp_column)
133+
data_frame_timestamp_column=timestamp_column, **kwargs)
127134

128-
def query(self, query, language="sql", mode="all"):
135+
def query(self, query, language="sql", mode="all", database=None, **kwargs ):
129136
"""
130137
Query data from InfluxDB.
131138
@@ -137,10 +144,15 @@ def query(self, query, language="sql", mode="all"):
137144
:type mode: str
138145
:return: The queried data.
139146
"""
147+
if database is None:
148+
database = self._database
149+
140150
try:
141-
ticket_data = {"database": self._database, "sql_query": query, "query_type": language}
151+
# Create an authorization header
152+
_options = FlightCallOptions(headers=[(b"authorization", f"Bearer {self._token}".encode('utf-8'))], **kwargs)
153+
ticket_data = {"database": database, "sql_query": query, "query_type": language}
142154
ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))
143-
flight_reader = self._flight_client.do_get(ticket, self._options)
155+
flight_reader = self._flight_client.do_get(ticket, _options)
144156

145157
mode_func = {
146158
"all": flight_reader.read_all,

0 commit comments

Comments
 (0)
0