10000 Improve line-protocol by tzonghao · Pull Request #224 · influxdata/influxdb-python · GitHub
[go: up one dir, main page]

Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Improve line-protocol #224

Merged
merged 6 commits into from
Aug 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions influxdb/_dataframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def write_points(self, dataframe, measurement, tags=None,
:param dataframe: data points in a DataFrame
:param measurement: name of measurement
:param tags: dictionary of tags, with string key-values
:param time_precision: [Optional, default 's'] Either 's', 'ms', 'u'
:param time_precision: [Optional, default None] Either 's', 'ms', 'u'
or 'n'.
:param batch_size: [Optional] Value to write the points in batches
instead of all at one time. Useful for when doing data dumps from
Expand All @@ -55,15 +55,14 @@ def write_points(self, dataframe, measurement, tags=None,
end_index = (batch + 1) * batch_size
points = self._convert_dataframe_to_json(
dataframe.ix[start_index:end_index].copy(),
measurement,
tags
measurement, tags, time_precision
)
super(DataFrameClient, self).write_points(
points, time_precision, database, retention_policy)
return True
else:
points = self._convert_dataframe_to_json(
dataframe, measurement, tags
dataframe, measurement, tags, time_precision
)
super(DataFrameClient, self).write_points(
points, time_precision, database, retention_policy)
Expand Down Expand Up @@ -116,7 +115,8 @@ def _to_dataframe(self, rs):
result[key] = df
return result

def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
time_precision=None):

if not isinstance(dataframe, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {}.'
Expand All @@ -136,11 +136,18 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
# Convert dtype for json serialization
dataframe = dataframe.astype('object')

precision_factor = {
"n": 1,
"u": 1e3,
"ms": 1e6,
"s": 1e9
}.get(time_precision, 1)

points = [
{'measurement': measurement,
'tags': tags if tags else {},
'fields': rec,
'time': ts.isoformat()
'time': int(ts.value / precision_factor)
}
for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))]
return points
Expand All @@ -150,8 +157,8 @@ def _datetime_to_epoch(self, datetime, time_precision='s'):
if time_precision == 's':
return seconds
elif time_precision == 'ms':
return seconds * 10 ** 3
return seconds * 1e3
elif time_precision == 'u':
return seconds * 10 ** 6
return seconds * 1e6
elif time_precision == 'n':
return seconds * 10 ** 9
return seconds * 1e9
7 changes: 6 additions & 1 deletion influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,16 @@ def write(self, data, params=None, expected_response_code=204):
headers = self._headers
headers['Content-type'] = 'application/octet-stream'

if params:
precision = params.get('precision')
else:
precision = None

self.request(
url="write",
method='POST',
params=params,
data=make_lines(data).encode('utf-8'),
data=make_lines(data, precision).encode('utf-8'),
expected_response_code=expected_response_code,
headers=headers
)
Expand Down
57 changes: 36 additions & 21 deletions influxdb/line_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,28 @@
from datetime import datetime

from dateutil.parser import parse
from pytz import utc
from six import binary_type, text_type


def _convert_timestamp(timestamp):
def _convert_timestamp(timestamp, precision=None):
if isinstance(timestamp, int):
return timestamp
return timestamp # assume precision is correct if timestamp is int
if isinstance(_force_text(timestamp), text_type):
timestamp = parse(timestamp)
if isinstance(timestamp, datetime):
if timestamp.tzinfo:
timestamp = timestamp.astimezone(utc)
timestamp.replace(tzinfo=None)
return (
timegm(timestamp.timetuple()) * 1e9 +
ns = (
timegm(timestamp.utctimetuple()) * 1e9 +
timestamp.microsecond * 1e3
)
if precision is None or precision == 'n':
return ns
elif precision == 'u':
return ns / 1e3
elif precision == 'ms':
return ns / 1e6
elif precision == 's':
return ns / 1e9

raise ValueError(timestamp)


Expand Down Expand Up @@ -58,46 +63,56 @@ def _force_text(data):
return data


def make_lines(data):
def make_lines(data, precision=None):
"""
Extracts the points from the given dict and returns a Unicode string
matching the line protocol introduced in InfluxDB 0.9.0.
"""
lines = ""
lines = []
static_tags = data.get('tags', None)
for point in data['points']:
elements = []

# add measurement name
lines += _escape_tag(_force_text(
measurement = _escape_tag(_force_text(
point.get('measurement', data.get('measurement'))
)) + ","
))
key_values = [measurement]

# add tags
if static_tags is None:
tags = point.get('tags', {})
else:
tags = copy(static_tags)
tags.update(point.get('tags', {}))

# tags should be sorted client-side to take load off server
for tag_key in sorted(tags.keys()):
key = _escape_tag(tag_key)
value = _escape_tag(tags[tag_ 8000 key])
if key != '' and value != '':
lines += "{key}={value},".format(key=key, value=value)
lines = lines[:-1] + " " # strip the trailing comma
key_values.append("{key}={value}".format(key=key, value=value))
key_values = ','.join(key_values)
elements.append(key_values)

# add fields
field_values = []
for field_key in sorted(point['fields'].keys()):
lines += "{key}={value},".format(
field_values.append("{key}={value}".format(
key=_escape_tag(field_key),
value=_escape_value(point['fields'][field_key]),
)
lines = lines[:-1] # strip the trailing comma
))
field_values = ','.join(field_values)
elements.append(field_values)

# add timestamp
if 'time' in point:
lines += " " + _force_text(str(int(
_convert_timestamp(point['time'])
timestamp = _force_text(str(int(
_convert_timestamp(point['time'], precision)
)))
elements.append(timestamp)

lines += "\n"
return lines
line = ' '.join(elements)
lines.append(line)
lines = '\n'.join(lines)
return lines + '\n'
26 changes: 22 additions & 4 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,17 +275,35 @@ def test_write_points_with_precision(self):
)

cli = InfluxDBClient(database='db')
cli.write_points(
self.dummy_points,
time_precision='n'
)

cli.write_points(self.dummy_points, time_precision='n')
self.assertEqual(
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000000000000\n",
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='u')
self.assertEqual(
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000000000\n",
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='ms')
self.assertEqual(
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000000\n",
m.last_request.body,
)

cli.write_points(self.dummy_points, time_precision='s')
self.assertEqual(
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000\n",
m.last_request.body,
)

def test_write_points_bad_precision(self):
cli = InfluxDBClient()
with self.assertRaisesRegexp(
Expand Down
49 changes: 26 additions & 23 deletions influxdb/tests/dataframe_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,38 +119,41 @@ def test_write_points_from_dataframe_with_time_precision(self):
"http://localhost:8086/write",
status_code=204)

points = {
'database': 'db',
'points': [
{'time': '1970-01-01T00:00:00+00:00',
'fields': {
'column_one': '1',
'column_three': 1.0,
'column_two': 1},
'tags': {},
'measurement': 'foo'},
{'time': '1970-01-01T01:00:00+00:00',
'fields': {
'column_one': '2',
'column_three': 2.0,
'column_two': 2},
'tags': {},
'measurement': 'foo'}]
}

cli = DataFrameClient(database='db')
measurement = "foo"

cli.write_points(dataframe, measurement, time_precision='s')
self.assertEqual(m.last_request.qs['precision'], ['s'])
self.assertEqual(
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
b'column_one="2",column_three=2.0,column_two=2 3600\n',
m.last_request.body,
)

cli.write_points(dataframe, measurement, time_precision='m')
points.update(precision='m')
self.assertEqual(m.last_request.qs['precision'], ['m'])
cli.write_points(dataframe, measurement, time_precision='ms')
self.assertEqual(m.last_request.qs['precision'], ['ms'])
self.assertEqual(
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
b'column_one="2",column_three=2.0,column_two=2 3600000\n',
m.last_request.body,
)

cli.write_points(dataframe, measurement, time_precision='u')
points.update(precision='u')
self.assertEqual(m.last_request.qs['precision'], ['u'])
self.assertEqual(
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
b'column_one="2",column_three=2.0,column_two=2 3600000000\n',
m.last_request.body,
)

cli.write_points(dataframe, measurement, time_precision='n')
self.assertEqual(m.last_request.qs['precision'], ['n'])
self.assertEqual(
b'foo column_one="1",column_three=1.0,column_two=1 0\n'
b'foo column_one="2",column_three=2.0,column_two=2 '
b'3600000000000\n',
m.last_request.body,
)

@raises(TypeError)
def test_write_points_from_dataframe_fails_without_time_index(self):
Expand Down
0