8000 Merge pull request #55 from InfluxCommunity/52-create-common-communit… · InfluxCommunity/influxdb3-python@99a17d1 · GitHub
[go: up one dir, main page]

Skip to content

Commit 99a17d1

Browse files
Merge pull request #55 from InfluxCommunity/52-create-common-community-examples
Community examples
2 parents adb95ef + f1cc92d commit 99a17d1

File tree

1 file changed

+78
-0
lines changed

1 file changed

+78
-0
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import random
2+
import pymongo
3+
import pandas as pd
4+
from bson import ObjectId
5+
import influxdb_client_3 as InfluxDBClient3
6+
import pandas as pd
7+
import numpy as np
8+
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
9+
import datetime
10+
import time
11+
12+
13+
class BatchingCallback(object):
14+
15+
def success(self, conf, data: str):
16+
print(f"Written batch: {conf}, data: {data}")
17+
18+
def error(self, conf, data: str, exception: InfluxDBError):
19+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
20+
21+
def retry(self, conf, data: str, exception: InfluxDBError):
22+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
23+
24+
25+
26+
27+
# InfluxDB connection details
28+
token = ""
29+
org = "6a841c0c08328fb1"
30+
dbfrom = "a"
31+
dbto = "b"
32+
url = "eu-central-1-1.aws.cloud2.influxdata.com"
33+
measurement = "airSensors"
34+
taglist= []
35+
36+
callback = BatchingCallback()
37+
38+
write_options = WriteOptions(batch_size=5_000,
39+
flush_interval=10_000,
40+
jitter_interval=2_000,
41+
retry_interval=5_000,
42+
max_retries=5,
43+
max_retry_delay=30_000,
44+
exponential_base=2)
45+
46+
wco = write_client_options(success_callback=callback.success,
47+
error_callback=callback.error,
48+
retry_callback=callback.retry,
49+
WriteOptions=write_options
50+
)
51+
# Opening InfluxDB client with a batch size of 5k points or flush interval
52+
# of 10k ms and gzip compression
53+
with InfluxDBClient3.InfluxDBClient3(token=token,
54+
host=url,
55+
org=org,
56+
enable_gzip=True, write_client_options=wco) as _client:
57+
query = f"SHOW TAG KEYS FROM {measurement}"
58+
tags = _client.query(query=query, language="influxql", database=dbfrom)
59+
tags = tags.to_pydict()
60+
taglist = tags['tagKey']
61+
62+
63+
query = f"SELECT * FROM {measurement}"
64+
reader = _client.query(query=query, language="influxql", database=dbfrom, mode="chunk")
65+
try:
66+
while True:
67+
batch, buff = reader.read_chunk()
68+
print("batch:")
69+
pd = batch.to_pandas()
70+
pd = pd.set_index('time')
71+
print(pd)
72+
_client.write(database=dbto, record=pd, data_frame_measurement_name=measurement, data_frame_tag_columns=taglist)
73+
time.sleep(2)
74+
except StopIteration:
75+
print("No more chunks to read")
76+
77+
78+

0 commit comments

Comments
 (0)
0