@@ -183,21 +183,23 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
183
183
def write (self , bucket : str , org : str = None ,
184
184
record : Union [
185
185
str , List ['str' ], Point , List ['Point' ], dict , List ['dict' ], bytes , List ['bytes' ], Observable ] = None ,
186
- write_precision : WritePrecision = DEFAULT_WRITE_PRECISION ) -> None :
186
+ write_precision : WritePrecision = DEFAULT_WRITE_PRECISION , ** kwargs ) -> None :
187
187
"""
188
188
Writes time-series data into influxdb.
189
189
190
190
:param str org: specifies the destination organization for writes; take either the ID or Name interchangeably; if both orgID and org are specified, org takes precedence. (required)
191
191
:param str bucket: specifies the destination bucket for writes (required)
192
192
:param WritePrecision write_precision: specifies the precision for the unix timestamps within the body line-protocol
193
- :param record: Points, line protocol, RxPY Observable to write
193
+ :param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
194
+ :param data_frame_measurement_name: name of measurement for writing Pandas DataFrame
195
+ :param data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
194
196
195
197
"""
196
198
197
199
if org is None :
198
200
org = self ._influxdb_client .org
199
201
200
- if self ._point_settings .defaultTags and record :
202
+ if self ._point_settings .defaultTags and record is not None :
201
203
for key , val in self ._point_settings .defaultTags .items ():
202
204
if isinstance (record , dict ):
203
205
record .get ("tags" )[key ] = val
@@ -209,9 +211,10 @@ def write(self, bucket: str, org: str = None,
209
211
r .tag (key , val )
210
212
211
213
if self ._write_options .write_type is WriteType .batching :
212
- return self ._write_batching (bucket , org , record , write_precision )
214
+ return self ._write_batching (bucket , org , record ,
215
+ write_precision , ** kwargs )
213
216
214
- final_string = self ._serialize (record , write_precision )
217
+ final_string = self ._serialize (record , write_precision , ** kwargs )
215
218
216
219
_async_req = True if self ._write_options .write_type == WriteType .asynchronous else False
217
220
@@ -235,7 +238,7 @@ def __del__(self):
235
238
self ._disposable = None
236
239
pass
237
240
238
- def _serialize (self , record , write_precision ) -> bytes :
241
+ def _serialize (self , record , write_precision , ** kwargs ) -> bytes :
239
242
_result = b''
240
243
if isinstance (record , bytes ):
241
244
_result = record
@@ -244,40 +247,96 @@ def _serialize(self, record, write_precision) -> bytes:
244
247
_result = record .encode ("utf-8" )
245
248
246
249
elif isinstance (record , Point ):
247
- _result = self ._serialize (record .to_line_protocol (), write_precision = write_precision )
250
+ _result = self ._serialize (record .to_line_protocol (), write_precision , ** kwargs )
248
251
249
252
elif isinstance (record , dict ):
250
253
_result = self ._serialize (Point .from_dict (record , write_precision = write_precision ),
251
- write_precision = write_precision )
254
+ write_precision , ** kwargs )
255
+ elif 'DataFrame' in type (record ).__name__ :
256
+ _result = self ._serialize (self ._data_frame_to_list_of_points (record ,
257
+ precision = write_precision , ** kwargs ),
258
+ write_precision ,
259
+ ** kwargs )
260
+
252
261
elif isinstance (record , list ):
253
- _result = b'\n ' .join ([self ._serialize (item , write_precision = write_precision ) for item in record ])
262
+ _result = b'\n ' .join ([self ._serialize (item , write_precision ,
263
+ ** kwargs ) for item in record ])
254
264
255
265
return _result
256
266
257
- def _write_batching (self , bucket , org , data , precision = DEFAULT_WRITE_PRECISION ):
267
+ def _write_batching (self , bucket , org , data ,
268
+ precision = DEFAULT_WRITE_PRECISION ,
269
+ ** kwargs ):
258
270
_key = _BatchItemKey (bucket , org , precision )
259
271
if isinstance (data , bytes ):
260
272
self ._subject .on_next (_BatchItem (key = _key , data = data ))
261
273
262
274
elif isinstance (data , str ):
263
- self ._write_batching (bucket , org , data .encode ("utf-8" ), precision )
275
+ self ._write_batching (bucket , org , data .encode ("utf-8" ),
276
+ precision , ** kwargs )
264
277
265
278
elif isinstance (data , Point ):
266
- self ._write_batching (bucket , org , data .to_line_protocol (), precision )
279
+ self ._write_batching (bucket , org , data .to_line_protocol (),
280
+ precision , ** kwargs )
267
281
268
282
elif isinstance (data , dict ):
269
- self ._write_batching (bucket , org , Point .from_dict (data , write_precision = precision ), precision )
283
+ self ._write_batching (bucket , org , Point .from_dict (data , write_precision = precision ),
284
+ precision , ** kwargs )
285
+
286
+ elif 'DataFrame' in type (data ).__name__ :
287
+ self ._write_batching (bucket , org , self ._data_frame_to_list_of_points (data , precision , ** kwargs ),
288
+ precision , ** kwargs )
270
289
271
290
elif isinstance (data , list ):
272
291
for item in data :
273
- self ._write_batching (bucket , org , item , precision )
292
+ self ._write_batching (bucket , org , item , precision , ** kwargs )
274
293
275
294
elif isinstance (data , Observable ):
276
- data .subscribe (lambda it : self ._write_batching (bucket , org , it , precision ))
295
+ data .subscribe (lambda it : self ._write_batching (bucket , org , it , precision , ** kwargs ))
277
296
pass
278
297
279
298
return None
280
299
300
+ def _data_frame_to_list_of_points (self , data_frame , precision , ** kwargs ):
301
+ from ..extras import pd
302
+ if not isinstance (data_frame , pd .DataFrame ):
303
+ raise TypeError ('Must be DataFrame, but type was: {0}.'
304
+ .format (type (data_frame )))
305
+
306
+ if 'data_frame_measurement_name' not in kwargs :
307
+ raise TypeError ('"data_frame_measurement_name" is a Required Argument' )
308
+
309
+ if isinstance (data_frame .index , pd .PeriodIndex ):
310
+ data_frame .index = data_frame .index .to_timestamp ()
311
+ else :
312
+ data_frame .index = pd .to_datetime (data_frame .index )
313
+
314
+ if data_frame .index .tzinfo is None :
315
+ data_frame .index = data_frame .index .tz_localize ('UTC' )
316
+
317
+ data = []
318
+
319
+ for c , (row ) in enumerate (data_frame .values ):
320
+ point = Point (measurement_name = kwargs .get ('data_frame_measurement_name' ))
321
+
322
+ for count , (value ) in enumerate (row ):
323
+ column = data_frame .columns [count ]
324
+ data_frame_tag_columns = kwargs .get ('data_frame_tag_columns' )
325
+ if data_frame_tag_columns and column in data_frame_tag_columns :
326
+ point .tag (column , value )
327
+ else :
328
+ point .field (column , value )
329
+
330
+ point .time (data_frame .index [c ], precision )
331
+
332
+ if self ._point_settings .defaultTags :
333
+ for key , val in self ._point_settings .defaultTags .items ():
334
+ point .tag (key , val )
335
+
336
+ data .append (point )
337
+
338
+ return data
339
+
281
340
def _http (self , batch_item : _BatchItem ):
282
341
283
342
logger .debug ("Write time series data into InfluxDB: %s" , batch_item )
0 commit comments