8000 fix: avoid to generate an empty write batch (#85) · kelseiv/influxdb-client-python@ab3915f · GitHub
[go: up one dir, main page]

Skip to content

Commit ab3915f

Browse files
authored
fix: avoid to generate an empty write batch (influxdata#85)
1 parent df77ad1 commit ab3915f

File tree

3 files changed

+36
-4
lines changed

3 files changed

+36
-4
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.7.0 [unreleased]
22

3+
### Bug Fixes
4+
1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch
5+
36
## 1.6.0 [2020-04-17]
47

58
### Documentation

influxdb_client/client/write_api.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
# coding: utf-8
22
import logging
3+
import os
34
from datetime import timedelta
45
from enum import Enum
56
from random import random
67
from time import sleep
78
from typing import Union, List
89

9-
import os
10-
1110
import rx
1211
from rx import operators as ops, Observable
1312
from rx.scheduler import ThreadPoolScheduler
@@ -172,8 +171,9 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
172171
ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))),
173172
ops.merge_all())),
174173
# Write data into InfluxDB (possibility to retry if its fail)
175-
ops.map(mapper=lambda batch: self._retryable(data=batch, delay=self._jitter_delay())), #
176-
ops.merge_all())\
174+
ops.filter(lambda batch: batch.size > 0),
175+
ops.map(mapper=lambda batch: self._retryable(data=batch, delay=self._jitter_delay())),
176+
ops.merge_all()) \
177177
.subscribe(self._on_next, self._on_error, self._on_complete)
178178

179179
else:

tests/test_WriteApiBatching.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,35 @@ def test_user_agent_header(self):
378378
self.assertEqual(1, len(requests))
379379
self.assertEqual(f'influxdb-client-python/{influxdb_client.__version__}', requests[0].headers['User-Agent'])
380380

381+
def test_to_low_flush_interval(self):
382+
383+
self._write_client.__del__()
384+
self._write_client = WriteApi(influxdb_client=self.influxdb_client,
385+
write_options=WriteOptions(batch_size=8,
386+
flush_interval=1,
387+
jitter_interval=1000))
388+
389+
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
390+
391+
for i in range(50):
392+
val_one = float(i)
393+
val_two = float(i) + 0.5
394+
point_one = Point("OneMillis").tag("sensor", "sensor1").field("PSI", val_one).time(time=i)
395+
point_two = Point("OneMillis").tag("sensor", "sensor2").field("PSI", val_two).time(time=i)
396+
397+
self._write_client.write("my-bucket", "my-org", [point_one, point_two])
398+
time.sleep(0.1)
399+
400+
self._write_client.__del__()
401+
402+
_requests = httpretty.httpretty.latest_requests
403+
404+
for _request in _requests:
405+
body = _request.parsed_body
406+
self.assertTrue(body, msg="Parsed body should be not empty " + str(_request))
407+
408+
httpretty.reset()
409+
381410

382411
if __name__ == '__main__':
383412
unittest.main()

0 commit comments

Comments
 (0)
0