diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 1d640256..6d9989f6 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -39,7 +39,7 @@ def write_points(self, dataframe, measurement, tags=None, :param dataframe: data points in a DataFrame :param measurement: name of measurement :param tags: dictionary of tags, with string key-values - :param time_precision: [Optional, default 's'] Either 's', 'ms', 'u' + :param time_precision: [Optional, default None] Either 's', 'ms', 'u' or 'n'. :param batch_size: [Optional] Value to write the points in batches instead of all at one time. Useful for when doing data dumps from @@ -55,15 +55,14 @@ def write_points(self, dataframe, measurement, tags=None, end_index = (batch + 1) * batch_size points = self._convert_dataframe_to_json( dataframe.ix[start_index:end_index].copy(), - measurement, - tags + measurement, tags, time_precision ) super(DataFrameClient, self).write_points( points, time_precision, database, retention_policy) return True else: points = self._convert_dataframe_to_json( - dataframe, measurement, tags + dataframe, measurement, tags, time_precision ) super(DataFrameClient, self).write_points( points, time_precision, database, retention_policy) @@ -116,7 +115,8 @@ def _to_dataframe(self, rs): result[key] = df return result - def _convert_dataframe_to_json(self, dataframe, measurement, tags=None): + def _convert_dataframe_to_json(self, dataframe, measurement, tags=None, + time_precision=None): if not isinstance(dataframe, pd.DataFrame): raise TypeError('Must be DataFrame, but type was: {}.' @@ -136,11 +136,18 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None): # Convert dtype for json serialization dataframe = dataframe.astype('object') + precision_factor = { + "n": 1, + "u": 1e3, + "ms": 1e6, + "s": 1e9 + }.get(time_precision, 1) + points = [ {'measurement': measurement, 'tags': tags if tags else {}, 'fields': rec, - 'time': ts.isoformat() + 'time': int(ts.value / precision_factor) } for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))] return points @@ -150,8 +157,8 @@ def _datetime_to_epoch(self, datetime, time_precision='s'): if time_precision == 's': return seconds elif time_precision == 'ms': - return seconds * 10 ** 3 + return seconds * 1e3 elif time_precision == 'u': - return seconds * 10 ** 6 + return seconds * 1e6 elif time_precision == 'n': - return seconds * 10 ** 9 + return seconds * 1e9 diff --git a/influxdb/client.py b/influxdb/client.py index 6dae571f..7bab357d 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -259,11 +259,16 @@ def write(self, data, params=None, expected_response_code=204): headers = self._headers headers['Content-type'] = 'application/octet-stream' + if params: + precision = params.get('precision') + else: + precision = None + self.request( url="write", method='POST', params=params, - data=make_lines(data).encode('utf-8'), + data=make_lines(data, precision).encode('utf-8'), expected_response_code=expected_response_code, headers=headers ) diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py index 403e463c..c47d4dc8 100644 --- a/influxdb/line_protocol.py +++ b/influxdb/line_protocol.py @@ -6,23 +6,28 @@ from datetime import datetime from dateutil.parser import parse -from pytz import utc from six import binary_type, text_type -def _convert_timestamp(timestamp): +def _convert_timestamp(timestamp, precision=None): if isinstance(timestamp, int): - return timestamp + return timestamp # assume precision is correct if timestamp is int if isinstance(_force_text(timestamp), text_type): timestamp = parse(timestamp) if isinstance(timestamp, datetime): - if timestamp.tzinfo: - timestamp = timestamp.astimezone(utc) - timestamp.replace(tzinfo=None) - return ( - timegm(timestamp.timetuple()) * 1e9 + + ns = ( + timegm(timestamp.utctimetuple()) * 1e9 + timestamp.microsecond * 1e3 ) + if precision is None or precision == 'n': + return ns + elif precision == 'u': + return ns / 1e3 + elif precision == 'ms': + return ns / 1e6 + elif precision == 's': + return ns / 1e9 + raise ValueError(timestamp) @@ -58,18 +63,21 @@ def _force_text(data): return data -def make_lines(data): +def make_lines(data, precision=None): """ Extracts the points from the given dict and returns a Unicode string matching the line protocol introduced in InfluxDB 0.9.0. """ - lines = "" + lines = [] static_tags = data.get('tags', None) for point in data['points']: + elements = [] + # add measurement name - lines += _escape_tag(_force_text( + measurement = _escape_tag(_force_text( point.get('measurement', data.get('measurement')) - )) + "," + )) + key_values = [measurement] # add tags if static_tags is None: @@ -77,27 +85,34 @@ def make_lines(data): else: tags = copy(static_tags) tags.update(point.get('tags', {})) + # tags should be sorted client-side to take load off server for tag_key in sorted(tags.keys()): key = _escape_tag(tag_key) value = _escape_tag(tags[tag_key]) if key != '' and value != '': - lines += "{key}={value},".format(key=key, value=value) - lines = lines[:-1] + " " # strip the trailing comma + key_values.append("{key}={value}".format(key=key, value=value)) + key_values = ','.join(key_values) + elements.append(key_values) # add fields + field_values = [] for field_key in sorted(point['fields'].keys()): - lines += "{key}={value},".format( + field_values.append("{key}={value}".format( key=_escape_tag(field_key), value=_escape_value(point['fields'][field_key]), - ) - lines = lines[:-1] # strip the trailing comma + )) + field_values = ','.join(field_values) + elements.append(field_values) # add timestamp if 'time' in point: - lines += " " + _force_text(str(int( - _convert_timestamp(point['time']) + timestamp = _force_text(str(int( + _convert_timestamp(point['time'], precision) ))) + elements.append(timestamp) - lines += "\n" - return lines + line = ' '.join(elements) + lines.append(line) + lines = '\n'.join(lines) + return lines + '\n' diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 43ecb68e..df73a66b 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -275,17 +275,35 @@ def test_write_points_with_precision(self): ) cli = InfluxDBClient(database='db') - cli.write_points( - self.dummy_points, - time_precision='n' - ) + cli.write_points(self.dummy_points, time_precision='n') self.assertEqual( b"cpu_load_short,host=server01,region=us-west " b"value=0.64 1257894000000000000\n", m.last_request.body, ) + cli.write_points(self.dummy_points, time_precision='u') + self.assertEqual( + b"cpu_load_short,host=server01,region=us-west " + b"value=0.64 1257894000000000\n", + m.last_request.body, + ) + + cli.write_points(self.dummy_points, time_precision='ms') + self.assertEqual( + b"cpu_load_short,host=server01,region=us-west " + b"value=0.64 1257894000000\n", + m.last_request.body, + ) + + cli.write_points(self.dummy_points, time_precision='s') + self.assertEqual( + b"cpu_load_short,host=server01,region=us-west " + b"value=0.64 1257894000\n", + m.last_request.body, + ) + def test_write_points_bad_precision(self): cli = InfluxDBClient() with self.assertRaisesRegexp( diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index d4aac040..6ad42238 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -119,38 +119,41 @@ def test_write_points_from_dataframe_with_time_precision(self): "http://localhost:8086/write", status_code=204) - points = { - 'database': 'db', - 'points': [ - {'time': '1970-01-01T00:00:00+00:00', - 'fields': { - 'column_one': '1', - 'column_three': 1.0, - 'column_two': 1}, - 'tags': {}, - 'measurement': 'foo'}, - {'time': '1970-01-01T01:00:00+00:00', - 'fields': { - 'column_one': '2', - 'column_three': 2.0, - 'column_two': 2}, - 'tags': {}, - 'measurement': 'foo'}] - } - cli = DataFrameClient(database='db') measurement = "foo" cli.write_points(dataframe, measurement, time_precision='s') self.assertEqual(m.last_request.qs['precision'], ['s']) + self.assertEqual( + b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo ' + b'column_one="2",column_three=2.0,column_two=2 3600\n', + m.last_request.body, + ) - cli.write_points(dataframe, measurement, time_precision='m') - points.update(precision='m') - self.assertEqual(m.last_request.qs['precision'], ['m']) + cli.write_points(dataframe, measurement, time_precision='ms') + self.assertEqual(m.last_request.qs['precision'], ['ms']) + self.assertEqual( + b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo ' + b'column_one="2",column_three=2.0,column_two=2 3600000\n', + m.last_request.body, + ) cli.write_points(dataframe, measurement, time_precision='u') - points.update(precision='u') self.assertEqual(m.last_request.qs['precision'], ['u']) + self.assertEqual( + b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo ' + b'column_one="2",column_three=2.0,column_two=2 3600000000\n', + m.last_request.body, + ) + + cli.write_points(dataframe, measurement, time_precision='n') + self.assertEqual(m.last_request.qs['precision'], ['n']) + self.assertEqual( + b'foo column_one="1",column_three=1.0,column_two=1 0\n' + b'foo column_one="2",column_three=2.0,column_two=2 ' + b'3600000000000\n', + m.last_request.body, + ) @raises(TypeError) def test_write_points_from_dataframe_fails_without_time_index(self):