influxdb_client_3
is a Python module that provides a simple and convenient way to interact with InfluxDB 3.0. This module supports both writing data to InfluxDB and querying data using the Flight client, which allows you to execute SQL and InfluxQL queries on InfluxDB 3.0.
pyarrow
influxdb-client
These are installed as part of the package
You can install 'influxdb3-python' using pip
:
pip install influxdb3-python
Note: Please make sure you are using 3.6 or above. For the best performance use 3.11+
from influxdb_client_3 import InfluxDBClient3, Point
If you are using InfluxDB Cloud, then you should note that:
- You will need to supply your org id, this is not necessary for InfluxDB Dedicated.
- Use a bucketname for the database argument.
client = InfluxDBClient3(token="your-token",
host="your-host",
org="your-org",
database="your-database")
You can write data using the Point class, or supplying line protocol.
point = Point("measurement").tag("location", "london").field("temperature", 42)
client.write(point)
point = "measurement fieldname=0"
client.write(point)
Users can import data from CSV, JSON, Feather, ORC, Parquet
import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
class BatchingCallback(object):
def success(self, conf, data: str):
print(f"Written batch: {conf}, data: {data}")
def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
callback = BatchingCallback()
write_options = WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)
with InfluxDBClient3.InfluxDBClient3(
token="INSERT_TOKEN",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1",
database="python", write_client_options=wco) as client:
client.write_file(
file='./out.csv',
timestamp_column='time', tag_columns=["provider", "machineID"])
client.write_file(
file='./out.json',
timestamp_column='time', tag_columns=["provider", "machineID"], date_unit='ns' )
query = "select * from measurement"
reader = client.query(query=query, language="sql")
table = reader.read_all()
print(table.to_pandas().to_markdown())
query = "select * from measurement"
reader = client.query(query=query, language="influxql")
table = reader.read_all()
print(table.to_pandas().to_markdown())