8000 fix: parsing response with new line in column [async/await] (#497) · ashafaei/influxdb-client-python@a1c2615 · GitHub
[go: up one dir, main page]

Skip to content

Commit a1c2615

Browse files
authored
fix: parsing response with new line in column [async/await] (influxdata#497)
1 parent 9e9a10c commit a1c2615

File tree

5 files changed

+64
-10
lines changed

5 files changed

+64
-10
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.33.0 [unreleased]
22

3+
### Bug Fixes
4+
1. [#497](https://github.com/influxdata/influxdb-client-python/pull/497): Parsing InfluxDB response with new line character in CSV column [async/await]
5+
36
## 1.32.0 [2022-08-25]
47

58
:warning: This release drop supports for Python 3.6. As of 2021-12-23, 3.6 has reached the end-of-life phase of its release cycle. 3.6.15 was the final security release. For more info see: https://peps.python.org/pep-0494/#lifespan

README.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,12 +1371,12 @@ How to use Asyncio
13711371
.. marker-asyncio-start
13721372
13731373
Starting from version 1.27.0 for Python 3.7+ the ``influxdb-client`` package supports ``async/await`` based on
1374-
`asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://docs.aiohttp.org>`_.
1375-
You can install ``aiohttp`` directly:
1374+
`asyncio <https://docs.python.org/3/library/asyncio.html>`_, `aiohttp <https://docs.aiohttp.org>`_ and `aiocsv <https://pypi.org/project/aiocsv/>`_.
1375+
You can install ``aiohttp`` and ``aiocsv`` directly:
13761376

13771377
.. code-block:: bash
13781378
1379-
$ python -m pip install influxdb-client aiohttp
1379+
$ python -m pip install influxdb-client aiohttp aiocsv
13801380
13811381
or use the ``[async]`` extra:
13821382

influxdb_client/client/flux_csv_parser.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
9999

100100
async def __aenter__(self) -> 'FluxCsvParser':
101101
"""Initialize CSV reader."""
102-
self._reader = self._response.content
102+
from aiocsv import AsyncReader
103+
self._reader = AsyncReader(_StreamReaderToWithAsyncRead(self._response.content))
103104

104105
return self
105106

@@ -134,11 +135,9 @@ async def _parse_flux_response_async(self):
134135
metadata = _FluxCsvParserMetadata()
135136

136137
try:
137-
async for line in self._reader:
138-
csv = list(csv_parser.reader([line.decode(_UTF_8_encoding)]))
139-
if len(csv) >= 1:
140-
for val in self._parse_flux_response_row(metadata, csv[0]):
141-
yield val
138+
async for csv in self._reader:
139+
for val in self._parse_flux_response_row(metadata, csv):
140+
yield val
142141

143142
# Return latest DataFrame
144143
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
@@ -371,3 +370,11 @@ def _print_profiler_info(self, flux_record: FluxRecord):
371370
print(f"{name:<20}: \n\n{val}")
372371
elif val is not None:
373372
print(f"{name:<20}: {val:<20}")
373+
374+
375+
class _StreamReaderToWithAsyncRead:
376+
def __init__(self, response):
377+
self.response = response
378+
379+
async def read(self, size: int) -> str:
380+
return (await self.response.read(size)).decode(_UTF_8_encoding)

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
]
4040

4141
async_requires = [
42-
'aiohttp>=3.8.1'
42+
'aiohttp>=3.8.1',
43+
'aiocsv>=1.2.2'
4344
]
4445

4546
with open('README.rst', 'r') as f:

tests/test_InfluxDBClientAsync.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from influxdb_client import Point, WritePrecision, BucketsService
1212
from influxdb_client.client.exceptions import InfluxDBError
1313
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
14+
from influxdb_client.client.query_api import QueryOptions
1415
from influxdb_client.client.warnings import MissingPivotFunction
1516
from tests.base_test import generate_name
1617

@@ -312,6 +313,48 @@ async def test_query_and_debug(self):
312313
results = await buckets_service.get_buckets()
313314
self.assertIn("my-bucket", list(map(lambda bucket: bucket.name, results.buckets)))
314315

316+
@async_test
317+
@aioresponses()
318+
async def test_parse_csv_with_new_lines_in_column(self, mocked):
319+
await self.client.close()
320+
self.client = InfluxDBClientAsync("http://localhost")
321+
mocked.post('http://localhost/api/v2/query?org=my-org', status=200, body='''#datatype,string,long,dateTime:RFC3339
322+
#group,false,false,false
323+
#default,_result,,
324+
,result,table,_time
325+
,,0,2022-09-09T10:22:13.744147091Z
326+
327+
#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,long,long,string
328+
#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false
329+
#default,_profiler,,,,,,,,,,,,,,,
330+
,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,influxdb/scanned-bytes,influxdb/scanned-values,flux/query-plan
331+
,,0,profiler/query,17305459,6292042,116958,0,0,10758125,0,448,0,,0,0,"digraph {
332+
""ReadRange4""
333+
""keep2""
334+
""limit3""
335+
336+
""ReadRange4"" -> ""keep2""
337+
""keep2"" -> ""limit3""
338+
}
339+
340+
"
341+
342+
#datatype,string,long,string,string,string,long,long,long,long,double
343+
#group,false,false,true,false,false,false,false,false,false,false
344+
#default,_profiler,,,,,,,,,
345+
,result,table,_measurement,Type,Label,Count,MinDuration,MaxDuration,DurationSum,MeanDuration
346+
,,1,profiler/operator,*influxdb.readFilterSource,ReadRange4,1,888209,888209,888209,888209
347+
,,1,profiler/operator,*universe.schemaMutationTransformation,keep2,4,1875,42042,64209,16052.25
348+
,,1,profiler/operator,*universe.limitTransformation,limit3,3,1333,38750,47874,15958''')
349+
350+
records = []
351+
await self.client\
352+
.query_api(QueryOptions(profilers=["operator", "query"],
353+
profiler_callback=lambda record: records.append(record))) \
354+
.query("buckets()", "my-org")
355+
356+
self.assertEqual(4, len(records))
357+
315358
async def _prepare_data(self, measurement: str):
316359
_point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3)
317360
_point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3)

0 commit comments

Comments
 (0)
0