8000 feat(write): add support for callback notification for batching mode … · chaosegg/influxdb-client-python@a86a561 · GitHub
[go: up one dir, main page]

Skip to content

Commit a86a561

Browse files
authored
feat(write): add support for callback notification for batching mode (influxdata#341)
1 parent 9b20924 commit a86a561

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+462
-135
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ jobs:
124124
python examples/monitoring_and_alerting.py
125125
python examples/buckets_management.py
126126
python examples/write_structured_data.py
127+
python examples/write_api_callbacks.py
127128
check-sphinx:
128129
docker:
129130
- image: *default-python

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
### Features
44
1. [#330](https://github.com/influxdata/influxdb-client-python/pull/330): Add support for write structured data - `NamedTuple`, `Data Classes`
55
1. [#335](https://github.com/influxdata/influxdb-client-python/pull/335): Add support for custom precision for index specified as number [DataFrame]
6+
1. [#341](https://github.com/influxdata/influxdb-client-python/pull/341): Add support for handling batch events
67

78
### Documentation
89
1. [#331](https://github.com/influxdata/influxdb-client-python/pull/331): Add [Migration Guide](MIGRATION_GUIDE.rst)
10+
1. [#341](https://github.com/influxdata/influxdb-client-python/pull/341): How to handle client errors
911

1012
## 1.21.0 [2021-09-17]
1113

README.rst

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ InfluxDB 2.0 client features
8383
- `Proxy configuration`_
8484
- `Nanosecond precision`_
8585
- `Delete data`_
86+
- `Handling Errors`_
8687

8788
Installation
8889
------------
@@ -1152,8 +1153,62 @@ The following forward compatible APIs are available:
11521153

11531154
For detail info see `InfluxDB 1.8 example <examples/influxdb_18_example.py>`_.
11541155

1156+
Handling Errors
1157+
^^^^^^^^^^^^^^^
1158+
.. marker-handling-errors-start
1159+
1160+
Errors happen and it's important that your code is prepared for them. All client related exceptions are delivered from
1161+
``InfluxDBError``. If the exception cannot be recovered in the client it is returned to the application.
1162+
These exceptions are left for the developer to handle.
1163+
1164+
Almost all APIs directly return unrecoverable exceptions to be handled this way:
1165+
1166+
.. code-block:: python
1167+
1168+
from influxdb_client import InfluxDBClient
1169+
from influxdb_client.client.exceptions import InfluxDBError
1170+
from influxdb_client.client.write_api import SYNCHRONOUS
1171+
1172+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
1173+
try:
1174+
client.write_api(write_options=SYNCHRONOUS).write("my-bucket", record="mem,tag=a value=86")
1175+
except InfluxDBError as e:
1176+
if e.response.status == 401:
1177+
raise Exception(f"Insufficient write permissions to 'my-bucket'.") from e
1178+
raise
1179+
1180+
1181+
The only exception is **batching** ``WriteAPI`` (for more info see `Batching`_). where you need to register custom callbacks to handle batch events.
1182+
This is because this API runs in the ``background`` in a ``separate`` thread and isn't possible to directly
1183+
return underlying exceptions.
1184+
1185+
.. code-block:: python
1186+
1187+
from influxdb_client import InfluxDBClient
1188+
from influxdb_client.client.exceptions import InfluxDBError
1189+
1190+
1191+
class BatchingCallback(object):
1192+
1193+
def success(self, conf: (str, str, str), data: str):
1194+
print(f"Written batch: {conf}, data: {data}")
1195+
1196+
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
1197+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
1198+
1199+
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
1200+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
1201+
1202+
1203+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
1204+
callback = BatchingCallback()
1205+
with client.write_api(success_callback=callback.success,
1206+
error_callback=callback.error,
1207+
retry_callback=callback.retry) as write_api:
1208+
pass
1209+
11551210
HTTP Retry Strategy
1156-
^^^^^^^^^^^^^^^^^^^
1211+
"""""""""""""""""""
11571212
By default the client uses a retry strategy only for batching writes (for more info see `Batching`_).
11581213
For other HTTP requests there is no one retry strategy, but it could be configured by ``retries``
11591214
parameter of ``InfluxDBClient``.
@@ -1169,6 +1224,8 @@ For more info about how configure HTTP retry see details in `urllib3 documentati
11691224
retries = Retry(connect=5, read=2, redirect=5)
11701225
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", retries=retries)
11711226
1227+
.. marker-handling-errors-end
1228+
11721229
Nanosecond precision
11731230
^^^^^^^^^^^^^^^^^^^^
11741231
.. marker-nanosecond-start

docs/usage.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ Nanosecond precision
4646
:start-after: marker-nanosecond-start
4747
:end-before: marker-nanosecond-end
4848

49+
Handling Errors
50+
^^^^^^^^^^^^^^^
51+
.. include:: ../README.rst
52+
:start-after: marker-handling-errors-start
53+
:end-before: marker-handling-errors-end
54+
4955
Debugging
5056
^^^^^^^^^
5157

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame
88
- [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/)
99
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB
10+
- [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events
1011
- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_)
1112

1213
## Queries

examples/write_api_callbacks.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""
2+
How to use WriteApi's callbacks to notify about state of background batches.
3+
"""
4+
5+
from influxdb_client import InfluxDBClient, Point
6+
from influxdb_client.client.exceptions import InfluxDBError
7+
8+
"""
9+
Configuration
10+
"""
11+
url = 'http://localhost:8086'
12+
token = 'my-token'
13+
org = 'my-org'
14+
bucket = 'my-bucket'
15+
16+
"""
17+
Data
18+
"""
19+
points = [Point("my-temperature").tag("location", "Prague").field("temperature", 25.3),
20+
Point("my-temperature").tag("location", "New York").field("temperature", 18.4)]
21+
22+
23+
class BatchingCallback(object):
24+
25+
def success(self, conf: (str, str, str), data: str):
26+
"""Successfully writen batch."""
27+
print(f"Written batch: {conf}, data: {data}")
28+
29+
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
30+
"""Unsuccessfully writen batch."""
31+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
32+
33+
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
34+
"""Retryable error."""
35+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
36+
37+
38+
callback = BatchingCallback()
39+
with InfluxDBClient(url=url, token=token, org=org) as client:
40+
"""
41+
Use batching API
42+
"""
43+
with client.write_api(success_callback=callback.success,
44+
error_callback=callback.error,
45+
retry_callback=callback.retry) as write_api:
46+
write_api.write(bucket=bucket, record=points)
47+
print()
48+
print("Wait to finishing ingesting...")
49+
print()

influxdb_client/client/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# flake8: noqa
22

33
"""
4-
Influx API Service.
4+
Influx OSS API Service.
55
66
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
77
8-
OpenAPI spec version: 0.1.0
8+
OpenAPI spec version: 2.0.0
99
Generated by: https://openapi-generator.tech
1010
"""
1111

@@ -19,6 +19,7 @@
1919
from influxdb_client.service.checks_service import ChecksService
2020
from influxdb_client.service.dbr_ps_service import DBRPsService
2121
from influxdb_client.service.dashboards_service import DashboardsService
22+
from influxdb_client.service.delete_service import DeleteService
2223
from influxdb_client.service.health_service import HealthService
2324
from influxdb_client.service.labels_service import LabelsService
2425
from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService

influxdb_client/client/influxdb_client.py

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,15 +247,88 @@ def from_env_properties(cls, debug=None, enable_gzip=False):
247247
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
248248
profilers=profilers)
249249

250-
def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi:
250+
def write_api(self, write_options=WriteOptions(), point_settings=PointSettings(), **kwargs) -> WriteApi:
251251
"""
252252
Create a Write API instance.
253253
254-
:param point_settings:
255-
:param write_options: write api configuration
254+
Example:
255+
.. code-block:: python
256+
257+
from influxdb_client import InfluxDBClient
258+
from influxdb_client.client.write_api import SYNCHRONOUS
259+
260+
261+
# Initialize SYNCHRONOUS instance of WriteApi
262+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
263+
write_api = client.write_api(write_options=SYNCHRONOUS)
264+
265+
If you would like to use a **background batching**, you have to configure client like this:
266+
267+
.. code-block:: python
268+
269+
from influxdb_client import InfluxDBClient
270+
271+
# Initialize background batching instance of WriteApi
272+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
273+
with client.write_api() as write_api:
274+
pass
275+
276+
There is also possibility to use callbacks to notify about state of background batches:
277+
278+
.. code-block:: python
279+
280+
from influxdb_client import InfluxDBClient
281+
from influxdb_client.client.exceptions import InfluxDBError
282+
283+
284+
class BatchingCallback(object):
285+
286+
def success(self, conf: (str, str, str), data: str):
287+
print(f"Written batch: {conf}, data: {data}")
288+
289+
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
290+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
291+
292+
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
293+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
294+
295+
296+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
297+
callback = BatchingCallback()
298+
with client.write_api(success_callback=callback.success,
299+
error_callback=callback.error,
300+
retry_callback=callback.retry) as write_api:
301+
pass
302+
303+
:param write_options: Write API configuration
304+
:param point_settings: settings to store default tags
305+
:key success_callback: The callable ``callback`` to run after successfully writen a batch.
306+
307+
The callable must accept two arguments:
308+
- `Tuple`: ``(bucket, organization, precision)``
309+
- `str`: written data
310+
311+
**[batching mode]**
312+
313+
:key error_callback: The callable ``callback`` to run after unsuccessfully writen a batch.
314+
315+
The callable must accept three arguments:
316+
- `Tuple`: ``(bucket, organization, precision)``
317+
- `str`: written data
318+
- `Exception`: an occurred error
319+
320+
**[batching mode]**
321+
:key retry_callback: The callable ``callback`` to run after retryable error occurred.
322+
323+
The callable must accept three arguments:
324+
- `Tuple`: ``(bucket, organization, precision)``
325+
- `str`: written data
326+
- `Exception`: an retryable error
327+
328+
**[batching mode]**
256329
:return: write api instance
257330
"""
258-
return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings)
331+
return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings, **kwargs)
259332

260333
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi:
261334
"""

influxdb_client/client/write/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# flake8: noqa
22

33
"""
4-
Influx API Service.
4+
Influx OSS API Service.
55
66
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
77
8-
OpenAPI spec version: 0.1.0
8+
OpenAPI spec version: 2.0.0
99
Generated by: https://openapi-generator.tech
1010
"""
1111

@@ -19,6 +19,7 @@
1919
from influxdb_client.service.checks_service import ChecksService
2020
from influxdb_client.service.dbr_ps_service import DBRPsService
2121
from influxdb_client.service.dashboards_service import DashboardsService
22+
from influxdb_client.service.delete_service import DeleteService
2223
from influxdb_client.service.health_service import HealthService
2324
from influxdb_client.service.labels_service import LabelsService
2425
from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService

influxdb_client/client/write/retry.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from datetime import datetime, timedelta
55
from itertools import takewhile
66
from random import random
7+
from typing import Callable
78

89
from urllib3 import Retry
910
from urllib3.exceptions import MaxRetryError, ResponseError
@@ -17,25 +18,32 @@ class WritesRetry(Retry):
1718
"""
1819
Writes retry configuration.
1920
20-
:param int jitter_interval: random milliseconds when retrying writes
21-
:param num max_retry_delay: maximum delay when retrying write in seconds
22-
:param int max_retry_time: maximum total retry timeout in seconds, attempt after this timout throws MaxRetryError
23-
:param int total: maximum number of retries
24-
:param num retry_interval: initial first retry delay range in seconds
25-
:param int exponential_base: base for the exponential retry delay,
26-
2721
The next delay is computed as random value between range
28-
`retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts)
29-
30-
Example: for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5
31-
retry delays are random distributed values within the ranges of
32-
[5-10, 10-20, 20-40, 40-80, 80-125]
22+
`retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts)
3323
24+
Example:
25+
for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5
26+
retry delays are random distributed values within the ranges of
27+
[5-10, 10-20, 20-40, 40-80, 80-125]
3428
"""
3529

3630
def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, max_retry_time=180, total=5,
37-
retry_interval=5, **kw):
38-
"""Initialize defaults."""
31+
retry_interval=5, retry_callback: Callable[[Exception], int] = None, **kw):
32+
"""
33+
Initialize defaults.
34+
35+
:param int jitter_interval: random milliseconds when retrying writes
36+
:param num max_retry_delay: maximum delay when retrying write in seconds
37+
:param int max_retry_time: maximum total retry timeout in seconds,
38+
attempt after this timout throws MaxRetryError
39+
:param int total: maximum number of retries
40+
:param num retry_interval: initial first retry delay range in seconds
41+
:param int exponential_base: base for the exponential retry delay,
42+
:param Callable[[Exception], int] retry_callback: the callable ``callback`` to run after retryable
43+
error occurred.
44+
The callable must accept one argument:
45+
- `Exception`: an retryable error
46+
"""
3947
super().__init__(**kw)
4048
self.jitter_interval = jitter_interval
4149
self.total = total
@@ -44,6 +52,7 @@ def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, m
4452
self.max_retry_time = max_retry_time
4553
self.exponential_base = exponential_base
4654
self.retry_timeout = datetime.now() + timedelta(seconds=max_retry_time)
55+
self.retry_callback = retry_callback
4756

4857
def new(self, **kw):
4958
"""Initialize defaults."""
@@ -57,6 +66,8 @@ def new(self, **kw):
5766
kw['max_retry_time'] = self.max_retry_time
5867
if 'exponential_base' not in kw:
5968
kw['exponential_base'] = self.exponential_base
69+
if 'retry_callback' not in kw:
70+
kw['retry_callback'] = self.retry_callback
6071

6172
new = super().new(**kw)
6273
new.retry_timeout = self.retry_timeout
@@ -123,6 +134,9 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None
123134
if isinstance(parsed_error, InfluxDBError):
124135
message += f" Retry in {parsed_error.retry_after}s."
125136

137+
if self.retry_callback:
138+
self.retry_callback(parsed_error)
139+
126140
logger.warning(message)
127141

128142
return new_retry

0 commit comments

Comments
 (0)
0