8000 initial implementation of line protocol · jimmy777/influxdb-python@078350e · GitHub
[go: up one dir, main page]

Skip to content
8000

Commit 078350e

Browse files
committed
initial implementation of line protocol
as introduced in influxdata/influxdb#2696
1 parent 3e1a03c commit 078350e

File tree

3 files changed

+114
-16
lines changed
  • influxdb
  • 3 files changed

    +114
    -16
    lines changed

    influxdb/client.py

    Lines changed: 8 additions & 15 deletions
    Original file line numberDiff line numberDiff line change
    @@ -11,6 +11,7 @@
    1111
    import requests.exceptions
    1212
    from sys import version_info
    1313

    14+
    from influxdb.line_protocol import make_lines
    1415
    from influxdb.resultset import ResultSet
    1516

    1617
    try:
    @@ -221,14 +222,7 @@ def request(self, url, method='GET', params=None, data=None,
    221222
    if params is None:
    222223
    params = {}
    223224

    224-
    auth = {
    225-
    'u': self._username,
    226-
    'p': self._password
    227-
    }
    228-
    229-
    params.update(auth)
    230-
    231-
    if data is not None and not isinstance(data, str):
    225+
    if isinstance(data, dict) or isinstance(data, list):
    232226
    data = json.dumps(data)
    233227

    234228
    # Try to send the request a maximum of three times. (see #103)
    @@ -238,6 +232,7 @@ def request(self, url, method='GET', params=None, data=None,
    238232
    response = self._session.request(
    239233
    method=method,
    240234
    url=url,
    235+
    auth=(self._username, self._password),
    241236
    params=params,
    242237
    data=data,
    243238
    headers=self._headers,
    @@ -270,10 +265,10 @@ def write(self, data, params=None, expected_response_code=204):
    270265
    :rtype: bool
    271266
    """
    272267
    self.request(
    273-
    url="write",
    268+
    url="write_points",
    274269
    method='POST',
    275270
    params=params,
    276-
    data=data,
    271+
    data=make_lines(data).encode('utf-8'),
    277272
    expected_response_code=expected_response_code
    278273
    )
    279274
    return True
    @@ -396,13 +391,12 @@ def _write_points(self,
    396391
    if tags:
    397392
    data['tags'] = tags
    398393

    399-
    data['database'] = database or self._database
    400-
    401394
    if self.use_udp:
    402395
    self.send_packet(data)
    403396
    else:
    404397
    self.write(
    405398
    data=data,
    399+
    params={'db': database or self._database},
    406400
    expected_response_code=204
    407401
    )
    408402

    @@ -679,9 +673,8 @@ def send_packet(self, packet):
    679673
    :param packet: the packet to be sent
    680674
    :type packet: dict
    681675
    """
    682-
    data = json.dumps(packet)
    683-
    byte = data.encode('utf-8')
    684-
    self.udp_socket.sendto(byte, (self._host, self.udp_port))
    676+
    data = make_lines(packet).encode('utf-8')
    677+
    self.udp_socket.sendto(data, (self._host, self.udp_port))
    685678

    686679

    687680
    class InfluxDBClusterClient(object):

    influxdb/line_protocol.py

    Lines changed: 103 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,103 @@
    1+
    # -*- coding: utf-8 -*-
    2+
    from __future__ import unicode_literals
    3+
    4+
    from copy import copy
    5+
    from datetime import datetime
    6+
    from time import mktime
    7+
    8+
    from dateutil.parser import parse
    9+
    from pytz import utc
    10+
    from six import binary_type, text_type
    11+
    12+
    13+
    def _convert_timestamp(timestamp):
    14+
    if isinstance(timestamp, int):
    15+
    return timestamp
    16+
    if isinstance(_force_text(timestamp), text_type):
    17+
    timestamp = parse(timestamp)
    18+
    if isinstance(timestamp, datetime):
    19+
    if timestamp.tzinfo:
    20+
    timestamp = timestamp.astimezone(utc)
    21+
    timestamp.replace(tzinfo=None)
    22+
    return (
    23+
    mktime(timestamp.timetuple()) * 1e9 +
    24+
    timestamp.microsecond * 1e3
    25+
    )
    26+
    raise ValueError(timestamp)
    27+
    28+
    29+
    def _escape_tag(tag):
    30+
    return tag.replace(
    31+
    "\\", "\\\\"
    32+
    ).replace(
    33+
    " ", "\\ "
    34+
    ).replace(
    35+
    ",", "\\,"
    36+
    ).replace(
    37+
    "=", "\\="
    38+
    )
    39+
    40+
    41+
    def _escape_value(value):
    42+
    value = _force_text(value)
    43+
    if isinstance(value, text_type):
    44+
    return "\"{}\"".format(value.replace(
    45+
    "\"", "\\\""
    46+
    ))
    47+
    else:
    48+
    return str(value)
    49+
    50+
    51+
    def _force_text(data):
    52+
    """
    53+
    Try to return a text aka unicode object from the given data.
    54+
    """
    55+
    if isinstance(data, binary_type):
    56+
    return data.decode('utf-8', 'replace')
    57+
    else:
    58+
    return data
    59+
    60+
    61+
    def make_lines(data):
    62+
    """
    63+
    Extracts the points from the given dict and returns a Unicode string
    64+
    matching the line protocol introduced in InfluxDB 0.9.0.
    65+
    """
    66+
    lines = ""
    67+
    static_tags = data.get('tags', None)
    68+
    for point in data['points']:
    69+
    # add measurement name
    70+
    lines += _escape_tag(_force_text(
    71+
    point.get('measurement', data.get('measurement'))
    72+
    )) + ","
    73+
    74+
    # add tags
    75+
    if static_tags is None:
    76+
    tags = point.get('tags', {})
    77+
    else:
    78+
    tags = copy(static_tags)
    79+
    tags.update(point.get('tags', {}))
    80+
    # tags should be sorted client-side to take load off server
    81+
    for tag_key in sorted(tags.keys()):
    82+
    lines += "{key}={value},".format(
    83+
    key=_escape_tag(tag_key),
    84+
    value=_escape_tag(tags[tag_key]),
    85+
    )
    86+
    lines = lines[:-1] + " " # strip the trailing comma
    87+
    88+
    # add fields
    89+
    for field_key in sorted(point['fields'].keys()):
    90+
    lines += "{key}={value},".format(
    91+
    key=_escape_tag(field_key),
    92+
    value=_escape_value(point['fields'][field_key]),
    93+
    )
    94+
    lines = lines[:-1] # strip the trailing comma
    95+
    96+
    # add timestamp
    97+
    if 'timestamp' in point:
    98+
    lines += " " + _force_text(str(int(
    99+
    _convert_timestamp(point['timestamp'])
    100+
    )))
    101+
    102+
    lines += "\n"
    103+
    return lines

    requirements.txt

    Lines changed: 3 additions & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -1,2 +1,4 @@
    1+
    python-dateutil>=2.0.0
    2+
    pytz
    13
    requests>=1.0.3
    2-
    six==1.9.0
    4+
    six==1.9.0

    0 commit comments

    Comments
     (0)
    0