|
| 1 | +import random |
| 2 | +import pymongo |
| 3 | +import pandas as pd |
| 4 | +from bson import ObjectId |
| 5 | +import influxdb_client_3 as InfluxDBClient3 |
| 6 | +import pandas as pd |
| 7 | +import numpy as np |
| 8 | +from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError |
| 9 | +import datetime |
| 10 | +import time |
| 11 | + |
| 12 | + |
| 13 | +class BatchingCallback(object): |
| 14 | + |
| 15 | + def success(self, conf, data: str): |
| 16 | + print(f"Written batch: {conf}, data: {data}") |
| 17 | + |
| 18 | + def error(self, conf, data: str, exception: InfluxDBError): |
| 19 | + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") |
| 20 | + |
| 21 | + def retry(self, conf, data: str, exception: InfluxDBError): |
| 22 | + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") |
| 23 | + |
| 24 | + |
| 25 | + |
| 26 | + |
| 27 | +# InfluxDB connection details |
| 28 | +token = "" |
| 29 | +org = "6a841c0c08328fb1" |
| 30 | +dbfrom = "a" |
| 31 | +dbto = "b" |
| 32 | +url = "eu-central-1-1.aws.cloud2.influxdata.com" |
| 33 | +measurement = "airSensors" |
| 34 | +taglist= [] |
| 35 | + |
| 36 | +callback = BatchingCallback() |
| 37 | + |
| 38 | +write_options = WriteOptions(batch_size=5_000, |
| 39 | + flush_interval=10_000, |
| 40 | + jitter_interval=2_000, |
| 41 | + retry_interval=5_000, |
| 42 | + max_retries=5, |
| 43 | + max_retry_delay=30_000, |
| 44 | + exponential_base=2) |
| 45 | + |
| 46 | +wco = write_client_options(success_callback=callback.success, |
| 47 | + error_callback=callback.error, |
| 48 | + retry_callback=callback.retry, |
| 49 | + WriteOptions=write_options |
| 50 | + ) |
| 51 | +# Opening InfluxDB client with a batch size of 5k points or flush interval |
| 52 | +# of 10k ms and gzip compression |
| 53 | +with InfluxDBClient3.InfluxDBClient3(token=token, |
| 54 | + host=url, |
| 55 | + org=org, |
| 56 | + enable_gzip=True, write_client_options=wco) as _client: |
| 57 | + query = f"SHOW TAG KEYS FROM {measurement}" |
| 58 | + tags = _client.query(query=query, language="influxql", database=dbfrom) |
| 59 | + tags = tags.to_pydict() |
| 60 | + taglist = tags['tagKey'] |
| 61 | + |
| 62 | + |
| 63 | + query = f"SELECT * FROM {measurement}" |
| 64 | + reader = _client.query(query=query, language="influxql", database=dbfrom, mode="chunk") |
| 65 | + try: |
| 66 | + while True: |
| 67 | + batch, buff = reader.read_chunk() |
| 68 | + print("batch:") |
| 69 | + pd = batch.to_pandas() |
| 70 | + pd = pd.set_index('time') |
| 71 | + print(pd) |
| 72 | + _client.write(database=dbto, record=pd, data_frame_measurement_name=measurement, data_frame_tag_columns=taglist) |
| 73 | + time.sleep(2) |
| 74 | + except StopIteration: |
| 75 | + print("No more chunks to read") |
| 76 | + |
| 77 | + |
| 78 | + |
0 commit comments