From a1ffa0a5ab8bd377d5efabee7d899dc96c3000a1 Mon Sep 17 00:00:00 2001 From: "tzonghao.chen" Date: Thu, 8 Sep 2016 12:49:09 -0400 Subject: [PATCH 1/2] Fix DataFrameClient tag processing - tags should be sorted, with both global_tags and column_tags together - tags values should be escaped --- influxdb/_dataframe_client.py | 23 +++++++++------ influxdb/tests/dataframe_client_test.py | 37 ++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index ddae0862..90b5a69f 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -8,6 +8,7 @@ from __future__ import unicode_literals import math +import re import pandas as pd @@ -26,6 +27,10 @@ def _pandas_time_unit(time_precision): return unit +def _escape_pandas_series(s): + return s.apply(lambda v: re.escape(v)) + + class DataFrameClient(InfluxDBClient): """ The ``DataFrameClient`` object holds information necessary to connect @@ -242,6 +247,12 @@ def _convert_dataframe_to_lines(self, field_columns = list(field_columns) if list(field_columns) else [] tag_columns = list(tag_columns) if list(tag_columns) else [] + # Make global_tags as tag_columns + if global_tags: + for tag in global_tags: + dataframe[tag] = global_tags[tag] + tag_columns.append(tag) + # If field columns but no tag columns, assume rest of columns are tags if field_columns and (not tag_columns): tag_columns = list(column_series[~column_series.isin( @@ -268,6 +279,7 @@ def _convert_dataframe_to_lines(self, # If tag columns exist, make an array of formatted tag keys and values if tag_columns: tag_df = dataframe[tag_columns] + tag_df = tag_df.sort_index(axis=1) tag_df = self._stringify_dataframe( tag_df, numeric_precision, datatype='tag') tags = (',' + ( @@ -286,15 +298,6 @@ def _convert_dataframe_to_lines(self, fields = field_df.sum(axis=1) del field_df - # Add any global tags to formatted tag strings - if global_tags: - global_tags = ','.join(['='.join([tag, global_tags[tag]]) - for tag in global_tags]) - if tag_columns: - tags = tags + ',' + global_tags - else: - tags = ',' + global_tags - # Generate line protocol string points = (measurement + tags + ' ' + fields + ' ' + time).tolist() return points @@ -344,6 +347,8 @@ def _stringify_dataframe(self, # If dealing with fields, format ints and strings correctly dataframe[int_columns] = dataframe[int_columns] + 'i' dataframe[string_columns] = '"' + dataframe[string_columns] + '"' + elif datatype == 'tag': + dataframe = dataframe.apply(_escape_pandas_series) dataframe.columns = dataframe.columns.astype(str) return dataframe diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 0b3b9b90..782e5c82 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -108,10 +108,10 @@ def test_write_points_from_dataframe_with_tag_cols_and_global_tags(self): columns=["tag_one", "tag_two", "column_one", "column_two", "column_three"]) expected = ( - b"foo,tag_one=blue,tag_two=1,global_tag=value " + b"foo,global_tag=value,tag_one=blue,tag_two=1 " b"column_one=\"1\",column_two=1i,column_three=1.0 " b"0\n" - b"foo,tag_one=red,tag_two=0,global_tag=value " + b"foo,global_tag=value,tag_one=red,tag_two=0 " b"column_one=\"2\",column_two=2i,column_three=2.0 " b"3600000000000\n" ) @@ -155,10 +155,10 @@ def test_write_points_from_dataframe_with_tag_cols_and_defaults(self): ) expected_fields_no_tags = ( - b"foo,tag_one=blue,tag_two=1,tag_three=hot " + b"foo,tag_one=blue,tag_three=hot,tag_two=1 " b"column_one=\"1\",column_two=1i,column_three=1.0 " b"0\n" - b"foo,tag_one=red,tag_two=0,tag_three=cold " + b"foo,tag_one=red,tag_three=cold,tag_two=0 " b"column_one=\"2\",column_two=2i,column_three=2.0 " b"3600000000000\n" ) @@ -198,6 +198,35 @@ def test_write_points_from_dataframe_with_tag_cols_and_defaults(self): cli.write_points(dataframe, 'foo') self.assertEqual(m.last_request.body, expected_no_tags_no_fields) + def test_write_points_from_dataframe_with_tag_escaped(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame( + data=[['blue', 1, "1", 1, 1.0, 'hot'], + ['red,green=orange', 0, "2", 2, 2.0, 'cold']], + index=[now, now + timedelta(hours=1)], + columns=["tag_one", "tag_two", "column_one", + "column_two", "column_three", + "tag_three"]) + + expected_escaped_tags = ( + b"foo,tag_one=blue " + b"column_one=\"1\",column_two=1i " + b"0\n" + b"foo,tag_one=red\\,green\\=orange " + b"column_one=\"2\",column_two=2i " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + cli = DataFrameClient(database='db') + cli.write_points(dataframe, 'foo', + field_columns=['column_one', 'column_two'], + tag_columns=['tag_one']) + self.assertEqual(m.last_request.body, expected_escaped_tags) + def test_write_points_from_dataframe_with_numeric_column_names(self): now = pd.Timestamp('1970-01-01 00:00+00:00') # df with numeric column names From 978b080e959403ab121f5c736eab6d5d5ea82e5c Mon Sep 17 00:00:00 2001 From: "tzonghao.chen" Date: Thu, 8 Sep 2016 17:29:21 -0400 Subject: [PATCH 2/2] Preserve previous tag escaping behavior `re.escape` affects all non-alphanumerical characters, as opposed to `line_protocol._escape_tag` which only escapes specific characters [\ ,=] --- influxdb/_dataframe_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 90b5a69f..0341a41d 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -8,11 +8,11 @@ from __future__ import unicode_literals import math -import re import pandas as pd from .client import InfluxDBClient +from .line_protocol import _escape_tag def _pandas_time_unit(time_precision): @@ -28,7 +28,7 @@ def _pandas_time_unit(time_precision): def _escape_pandas_series(s): - return s.apply(lambda v: re.escape(v)) + return s.apply(lambda v: _escape_tag(v)) class DataFrameClient(InfluxDBClient):