8000 feat: support for writing pandas DataFrame (#88) · timbeccue/influxdb-client-python@a1f9826 · GitHub
[go: up one dir, main page]

Skip to content

Commit a1f9826

Browse files
authored
feat: support for writing pandas DataFrame (influxdata#88)
1 parent ab3915f commit a1f9826

File tree

5 files changed

+206
-15
lines changed

5 files changed

+206
-15
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.7.0 [unreleased]
22

3+
### Features
4+
1. [#79](https://github.com/influxdata/influxdb-client-python/issues/79): Added support for writing Pandas DataFrame
5+
36
### Bug Fixes
47
1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch
58

README.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ InfluxDB 2.0 client features
4949
- `Line Protocol <https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial>`_
5050
- `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__
5151
- `RxPY <https://rxpy.readthedocs.io/en/latest/>`__ Observable
52+
- `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
5253
- `How to writes <#writes>`_
5354
- `InfluxDB 2.0 API <https://github.com/influxdata/influxdb/blob/master/http/swagger.yml>`_ client for management
5455
- the client is generated from the `swagger <https://github.com/influxdata/influxdb/blob/master/http/swagger.yml>`_ by using the `openapi-generator <https://github.com/OpenAPITools/openapi-generator>`_
@@ -219,6 +220,7 @@ The data could be written as
219220
3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time``
220221
4. List of above items
221222
5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item
223+
6. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
222224

223225

224226
Batching
@@ -302,6 +304,16 @@ The batching is configurable by ``write_options``\ :
302304
303305
_write_client.write("my-bucket", "my-org", _data)
304306
307+
"""
308+
Write Pandas DataFrame
309+
"""
310+
_now = pd.Timestamp().now('UTC')
311+
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
312+
index=[now, now + timedelta(hours=1)],
313+
columns=["location", "water_level"])
314+
315+
_write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
316+
data_frame_tag_columns=['location'])
305317
306318
"""
307319
Close client

influxdb_client/client/write_api.py

Lines changed: 74 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -183,21 +183,23 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
183183
def write(self, bucket: str, org: str = None,
184184
record: Union[
185185
str, List['str'], Point, List['Point'], dict, List['dict'], bytes, List['bytes'], Observable] = None,
186-
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION) -> None:
186+
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> None:
187187
"""
188188
Writes time-series data into influxdb.
189189
190190
:param str org: specifies the destination organization for writes; take either the ID or Name interchangeably; if both orgID and org are specified, org takes precedence. (required)
191191
:param str bucket: specifies the destination bucket for writes (required)
192192
:param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol
193-
:param record: Points, line protocol, RxPY Observable to write
193+
:param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
194+
:param data_frame_measurement_name: name of measurement for writing Pandas DataFrame
195+
:param data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
194196
195197
"""
196198

197199
if org is None:
198200
org = self._influxdb_client.org
199201

200-
if self._point_settings.defaultTags and record:
202+
if self._point_settings.defaultTags and record is not None:
201203
for key, val in self._point_settings.defaultTags.items():
202204
if isinstance(record, dict):
203205
record.get("tags")[key] = val
@@ -209,9 +211,10 @@ def write(self, bucket: str, org: str = None,
209211
r.tag(key, val)
210212

211213
if self._write_options.write_type is WriteType.batching:
212-
return self._write_batching(bucket, org, record, write_precision)
214+
return self._write_batching(bucket, org, record,
215+
write_precision, **kwargs)
213216

214-
final_string = self._serialize(record, write_precision)
217+
final_string = self._serialize(record, write_precision, **kwargs)
215218

216219
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
217220

@@ -235,7 +238,7 @@ def __del__(self):
235238
self._disposable = None
236239
pass
237240

238-
def _serialize(self, record, write_precision) -> bytes:
241+
def _serialize(self, record, write_precision, **kwargs) -> bytes:
239242
_result = b''
240243
if isinstance(record, bytes):
241244
_result = record
@@ -244,40 +247,96 @@ def _serialize(self, record, write_precision) -> bytes:
244247
_result = record.encode("utf-8")
245248

246249
elif isinstance(record, Point):
247-
_result = self._serialize(record.to_line_protocol(), write_precision=write_precision)
250+
_result = self._serialize(record.to_line_protocol(), write_precision, **kwargs)
248251

249252
elif isinstance(record, dict):
250253
_result = self._serialize(Point.from_dict(record, write_precision=write_precision),
251-
write_precision=write_precision)
254+
write_precision, **kwargs)
255+
elif 'DataFrame' in type(record).__name__:
256+
_result = self._serialize(self._data_frame_to_list_of_points(record,
257+
precision=write_precision, **kwargs),
258+
write_precision,
259+
**kwargs)
260+
252261
elif isinstance(record, list):
253-
_result = b'\n'.join([self._serialize(item, write_precision=write_precision) for item in record])
262+
_result = b'\n'.join([self._serialize(item, write_precision,
263+
**kwargs) for item in record])
254264

255265
return _result
256266

257-
def _write_batching(self, bucket, org, data, precision=DEFAULT_WRITE_PRECISION):
267+
def _write_batching(self, bucket, org, data,
268+
precision=DEFAULT_WRITE_PRECISION,
269+
**kwargs):
258270
_key = _BatchItemKey(bucket, org, precision)
259271
if isinstance(data, bytes):
260272
self._subject.on_next(_BatchItem(key=_key, data=data))
261273

262274
elif isinstance(data, str):
263-
self._write_batching(bucket, org, data.encode("utf-8"), precision)
275+
self._write_batching(bucket, org, data.encode("utf-8"),
276+
precision, **kwargs)
264277

265278
elif isinstance(data, Point):
266-
self._write_batching(bucket, org, data.to_line_protocol(), precision)
279+
self._write_batching(bucket, org, data.to_line_protocol(),
280+
precision, **kwargs)
267281

268282
elif isinstance(data, dict):
269-
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision), precision)
283+
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision),
284+
precision, **kwargs)
285+
286+
elif 'DataFrame' in type(data).__name__:
287+
self._write_batching(bucket, org, self._data_frame_to_list_of_points(data, precision, **kwargs),
288+
precision, **kwargs)
270289

271290
elif isinstance(data, list):
272291
for item in data:
273-
self._write_batching(bucket, org, item, precision)
292+
self._write_batching(bucket, org, item, precision, **kwargs)
274293

275294
elif isinstance(data, Observable):
276-
data.subscribe(lambda it: self._write_batching(bucket, org, it, precision))
295+
data.subscribe(lambda it: self._write_batching(bucket, org, it, precision, **kwargs))
277296
pass
278297

279298
return None
280299

300+
def _data_frame_to_list_of_points(self, data_frame, precision, **kwargs):
301+
from ..extras import pd
302+
if not isinstance(data_frame, pd.DataFrame):
303+
raise TypeError('Must be DataFrame, but type was: {0}.'
304+
.format(type(data_frame)))
305+
306+
if 'data_frame_measurement_name' not in kwargs:
307+
raise TypeError('"data_frame_measurement_name" is a Required Argument')
308+
309+
if isinstance(data_frame.index, pd.PeriodIndex):
310+
data_frame.index = data_frame.index.to_timestamp()
311+
else:
312+
data_frame.index = pd.to_datetime(data_frame.index)
313+
314+
if data_frame.index.tzinfo is None:
315+
data_frame.index = data_frame.index.tz_localize('UTC')
316+
317+
data = []
318+
319+
for c, (row) in enumerate(data_frame.values):
320+
point = Point(measurement_name=kwargs.get('data_frame_measurement_name'))
321+
322+
for count, (value) in enumerate(row):
323+
column = data_frame.columns[count]
324+
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
325+
if data_frame_tag_columns and column in data_frame_tag_columns:
326+
point.tag(column, value)
327+
else:
328+
point.field(column, value)
329+
330+
point.time(data_frame.index[c], precision)
331+
332+
if self._point_settings.defaultTags:
333+
for key, val in self._point_settings.defaultTags.items():
334+
point.tag(key, val)
335+
336+
data.append(point)
337+
338+
return data
339+
281340
def _http(self, batch_item: _BatchItem):
282341

283342
logger.debug("Write time series data into InfluxDB: %s", batch_item)

tests/test_WriteApi.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import unittest
88
import time
9+
from datetime import timedelta
910
from multiprocessing.pool import ApplyResult
1011

1112
from influxdb_client import Point, WritePrecision, InfluxDBClient
@@ -224,6 +225,57 @@ def test_write_bytes(self):
224225

225226
self.delete_test_bucket(_bucket)
226227

228+
def test_write_data_frame(self):
229+
from influxdb_client.extras import pd
230+
231+
bucket = self.create_test_bucket()
232+
233+
now = pd.Timestamp('1970-01-01 00:00+00:00')
234+
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
235+
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
236+
columns=["location", "water_level"])
237+
238+
self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
239+
data_frame_tag_columns=['location'])
240+
241+
result = self.query_api.query(
242+
"from(bucket:\"" + bucket.name + "\") |> range(start: 1970-01-01T00:00:00.000000001Z)", self.org)
243+
244+
self.assertEqual(1, len(result))
245+
self.assertEqual(2, len(result[0].records))
246+
247+
self.assertEqual(result[0].records[0].get_measurement(), "h2o_feet")
248+
self.assertEqual(result[0].records[0].get_value(), 1.0)
249+
self.assertEqual(result[0].records[0].values.get("location"), "coyote_creek")
250+
self.assertEqual(result[0].records[0].get_field(), "water_level")
251+
self.assertEqual(result[0].records[0].get_time(),
252+
datetime.datetime(1970, 1, 1, 1, 0, tzinfo=datetime.timezone.utc))
253+
254+
self.assertEqual(result[0].records[1].get_measurement(), "h2o_feet")
255+
self.assertEqual(result[0].records[1].get_value(), 2.0)
256+
self.assertEqual(result[0].records[1].values.get("location"), "coyote_creek")
257+
self.assertEqual(result[0].records[1].get_field(), "water_level")
258+
self.assertEqual(result[0].records[1].get_time(),
259+
datetime.datetime(1970, 1, 1, 2, 0, tzinfo=datetime.timezone.utc))
260+
261+
self.delete_test_bucket(bucket)
262+
263+
def test_write_data_frame_without_measurement_name(self):
264+
from influxdb_client.extras import pd
265+
266+
bucket = self.create_test_bucket()
267+
268+
now = pd.Timestamp('1970-01-01 00:00+00:00')
269+
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
270+
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
271+
columns=["location", "water_level"])
272+
273+
with self.assertRaises(TypeError) as cm:
274+
self.write_client.write(bucket.name, record=data_frame)
275+
exception = cm.exception
276+
277+
self.assertEqual('"data_frame_measurement_name" is a Required Argument', exception.__str__())
278+
227279
def test_use_default_org(self):
228280
bucket = self.create_test_bucket()
229281

@@ -362,6 +414,44 @@ def test_use_default_tags_with_dictionaries(self):
362414

363415
self.delete_test_bucket(bucket)
364416

417+
def test_use_default_tags_with_data_frame(self):
418+
from influxdb_client.extras import pd
419+
420+
bucket = self.create_test_bucket()
421+
422+
now = pd.Timestamp('1970-01-01 00:00+00:00')
423+
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
424+
index=[now + timedelta(hours=1), now + timedelta(hours=2)],
425+
columns=["location", "water_level"])
426+
427+
self.write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
428+
data_frame_tag_columns=['location'])
429+
430+
time.sleep(1)
431+
432+
query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
433+
434+
flux_result = self.client.query_api().query(query)
435+
436+
self.assertEqual(1, len(flux_result))
437+
438+
records = flux_result[0].records
439+
440+
self.assertEqual(2, len(records))
441+
442+
rec = records[0]
443+
rec2 = records[1]
444+
445+
self.assertEqual(self.id_tag, rec["id"])
446+
self.assertEqual(self.customer_tag, rec["customer"])
447+
self.assertEqual("LA", rec[self.data_center_key])
448+
449+
self.assertEqual(self.id_tag, rec2["id"])
450+
self.assertEqual(self.customer_tag, rec2["customer"])
451+
self.assertEqual("LA", rec2[self.data_center_key])
452+
453+
self.delete_test_bucket(bucket)
454+
365455
def test_write_bytes(self):
366456
bucket = self.create_test_bucket()
367457

tests/test_WriteApiBatching.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,33 @@ def test_to_low_flush_interval(self):
407407

408408
httpretty.reset()
409409

410+
def test_batching_data_frame(self):
411+
from influxdb_client.extras import pd
412+
413+
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
414+
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204)
415+
416+
data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0],
417+
["coyote_creek", 3.0], ["coyote_creek", 4.0]],
418+
index=[1, 2, 3, 4],
419+
columns=["location", "level water_level"])
420+
421+
self._write_client.write("my-bucket", "my-org", record=data_frame,
422+
data_frame_measurement_name='h2o_feet',
423+
data_frame_tag_columns=['location'])
424+
425+
time.sleep(1)
426+
427+
_requests = httpretty.httpretty.latest_requests
428+
429+
self.assertEqual(2, len(_requests))
430+
_request1 = "h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n" \
431+
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2"
432+
_request2 = "h2o_feet,location=coyote_creek level\\ water_level=3.0 3\n" \
433+
"h2o_feet,location=coyote_creek level\\ water_level=4.0 4"
434+
435+
self.assertEqual(_request1, _requests[0].parsed_body)
436+
self.assertEqual(_request2, _requests[1].parsed_body)
410437

411438
if __name__ == '__main__':
412439
unittest.main()

0 commit comments

Comments
 (0)
0