diff --git a/.gitignore b/.gitignore index 043223b4..fa59bd2a 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ dist .coverage.* .coverage .tox +examples/loadtest/logs/* +examples/storage/* \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 00000000..93323625 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,5 @@ +include setup.py README.rst MANIFEST.in LICENSE *.txt +recursive-include client_python/ * +graft tests +global-exclude *~ +recursive-exclude dist/* examples/* \ No newline at end of file diff --git a/README.md b/README.md index 8aa1d326..5b7efa63 100644 --- a/README.md +++ b/README.md @@ -458,3 +458,22 @@ for family in text_string_to_metric_families("my_gauge 1.0\n"): for sample in family.samples: print("Name: {0} Labels: {1} Value: {2}".format(*sample)) ``` + + +## UWSGI sharedarea MODE + +This mode enable uwsgi sharedarea memory to store all metrics and sync between processes. + +To enable mode, setup `PROMETHEUS_UWSGI_SHAREDAREA` environment variable with sharedarea id value. + +Example uwsgi config: + +``` ini + +[uwsgi] + +#... + +sharedarea=10 + +``` diff --git a/examples/Dockerfile b/examples/Dockerfile new file mode 100644 index 00000000..d9ed3b56 --- /dev/null +++ b/examples/Dockerfile @@ -0,0 +1,25 @@ +FROM python:2.7.8 + +ENV PYTHONUNBUFFERED=1 + +WORKDIR /usr/src/app + +RUN apt-get update && apt-get install -y \ + libxml2-dev libxslt-dev python-dev \ + libyaml-dev \ + graphviz + +COPY requirements.txt requirements.txt + +RUN pip install -U plop gprof2dot ipython +RUN pip install -r requirements.txt + +RUN mkdir /app-entrypoint.d + +#COPY app-entrypoint.sh / + +#ENTRYPOINT ["/app-entrypoint.sh"] + +ENTRYPOINT ["/bin/bash", "-c"] + +#CMD ["echo 'hello'"] diff --git a/examples/Makefile b/examples/Makefile new file mode 100644 index 00000000..66b49e4a --- /dev/null +++ b/examples/Makefile @@ -0,0 +1,22 @@ +current_dir = $(shell pwd) + +docker-clean: docker-clean-containers + +docker-clean-containers: + docker ps -q -f status=exited | xargs docker rm + +docker-build: + @echo "Run development services via docker" + sudo docker-compose -f compose-uwsgi.yml build --force-rm #--no-cache + +docker-stop: + @echo "Stop docker services" + sudo docker-compose -f compose-uwsgi.yml -f stop + +docker-start: + @echo "Run development services via docker" + docker-compose -f compose-uwsgi.yml up --force-recreate + +tank: + @echo "Tank load" + docker run --rm --net examples_default --link dev_flask_app:dev_flask_app -v $(current_dir)/loadtest/:/var/loadtest/ direvius/yandex-tank diff --git a/examples/app-entrypoint.sh b/examples/app-entrypoint.sh new file mode 100755 index 00000000..a6a6359e --- /dev/null +++ b/examples/app-entrypoint.sh @@ -0,0 +1,15 @@ +#!/bin/bash +set -e + +echo "Application entrypoint"; + +for f in /app-entrypoint.d/*; do + echo "File: $f"; + case "$f" in + *.sh) echo "$0: running $f"; . "$f" ;; + *) echo "$0: ignoring $f" ;; + esac + echo +done + +exec "$@"; diff --git a/examples/compose-uwsgi.yml b/examples/compose-uwsgi.yml new file mode 100644 index 00000000..b2c0c64c --- /dev/null +++ b/examples/compose-uwsgi.yml @@ -0,0 +1,24 @@ +version: "2.0" + +services: + dev_flask_app: + image: dev_flask_app:latest + build: + dockerfile: Dockerfile + context: . + environment: + - PROMETEUS_STATUS=1 + - prometheus_multiproc_dir=/usr/src/app/storage/prometheus/ + #- PROMETHEUS_UWSGI_SHAREDAREA=1 + - PYTHONUNBUFFERED=1 + + command: + - "rm -rf /usr/src/app/storage/prometheus/* && find /usr/src/app/ -name '*.pyc' -exec rm -f {} + && cd /prometheus_client && python /prometheus_client/setup.py install && cd /uwsgi_lib && python /uwsgi_lib/uwsgiconfig.py --build && cd /usr/src/app/ && /uwsgi_lib/uwsgi --ini=/usr/src/app/uwsgi.ini" + volumes: + - ./uwsgi.ini:/usr/src/app/uwsgi.ini + - ./:/usr/src/app/ + - ../:/prometheus_client + - /projects/libs/uwsgi:/uwsgi_lib + + ports: + - 8051:8051 diff --git a/examples/flask_app.py b/examples/flask_app.py new file mode 100644 index 00000000..a8a88eb8 --- /dev/null +++ b/examples/flask_app.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import os + +import prometheus_client +from flask import request, Response, Flask +from prometheus_client import Counter, Gauge, Summary, Histogram +from prometheus_client.core import CollectorRegistry +from string import ascii_letters +from random import choice + +REGISTRY = CollectorRegistry(auto_describe=False) + + +requests_total = Counter("app:requests_total", "Total count of requests", ["method", "url_rule", "env_role"], registry=REGISTRY) +inprogress_total = Gauge("app:inprogress_total", "Total count of requests in progress", ["method", "url_rule", "env_role"], registry=REGISTRY) +request_duration_summary_sec = Summary("app:request_duration_summary_sec", "Request duration in seconds", ["method", "url_rule", "env_role", "rnd"], registry=REGISTRY) +request_duration_historam_sec = Histogram("app:request_duration_histogram_sec", "Request duration in seconds", ["method", "url_rule", "env_role", "rnd"], registry=REGISTRY) + +random_counter = Counter("random_counter", "random counter", ["rnd"], registry=REGISTRY) + +APP_ENV_ROLE = os.environ.get('APP_ROLE', 'unknown') + +app = Flask(__name__) +app.debug = True + + +@app.route("/metrics") +def metrics(): + text = "# Process in {0}\n".format(os.getpid()) + + return Response(text + prometheus_client.generate_latest(REGISTRY), mimetype="text/plain") + + +@app.route('/') +@app.route('/') +def index(path='/'): + requests_total.labels(method=request.method, url_rule=path, env_role=APP_ENV_ROLE).inc() + + #requests_total.labels(method=request.method, url_rule=path, env_role=APP_ENV_ROLE).inc() + + rnd = ''.join([choice(ascii_letters) for x in xrange(10)]) + + text = "# Process in {0} rnd={1}\n".format(os.getpid(), rnd) + + #random_counter.labels(rnd=rnd).inc() + + #with request_duration_summary_sec.labels(method=request.method, url_rule=path, env_role=APP_ENV_ROLE, rnd=rnd).time():#, \ + #request_duration_historam_sec.labels(method=request.method, url_rule=path, env_role=APP_ENV_ROLE, rnd=rnd).time(): + + return Response(text, mimetype="text/plain") + +application = app +print("Debug app init") diff --git a/examples/loadtest/load.ini b/examples/loadtest/load.ini new file mode 100644 index 00000000..91bdf4bb --- /dev/null +++ b/examples/loadtest/load.ini @@ -0,0 +1,8 @@ +[phantom] +address=dev_flask_app:8051 ; +rps_schedule=const(100, 1m) ; +connection_test=0 ; + +ammofile=/var/loadtest/uris.txt +ammo_type=uri +writelog=1 \ No newline at end of file diff --git a/examples/loadtest/uris.txt b/examples/loadtest/uris.txt new file mode 100644 index 00000000..c045b912 --- /dev/null +++ b/examples/loadtest/uris.txt @@ -0,0 +1,20 @@ +[Connection: close] +[Host: dev_flask_app] +[Cookie: None] + +/api/v +/business/ +/auto/ +/starlife/ +/politics/ +/sport/ +/kids/ +/lifestyle/ +/incidents/ +/9may/ +/health/ +/newyear/ +/kinomusic/ +/games/ +/scitech/ +/ diff --git a/examples/requirements.txt b/examples/requirements.txt new file mode 100644 index 00000000..89c28265 --- /dev/null +++ b/examples/requirements.txt @@ -0,0 +1,3 @@ +Flask==0.10.1 +#git+https://github.com/Lispython/client_python.git +#uwsgi==2.0.14 diff --git a/examples/uwsgi.ini b/examples/uwsgi.ini new file mode 100644 index 00000000..ba6572ad --- /dev/null +++ b/examples/uwsgi.ini @@ -0,0 +1,21 @@ +[uwsgi] +chdir=/usr/src/app/ +env = APP_ROLE=dev_uwsgi +wsgi-file = /usr/src/app/flask_app.py +master=True +vacuum=True +max-requests=5000 +harakiri=120 +post-buffering=65536 +workers=1 +#enable-threads=True +#listen=4000 +# socket=0.0.0.0:8997 +stats=/tmp/uwsgi-app.stats +#logger=syslog:uwsgi_app_stage,local0 +buffer-size=65536 +http = 0.0.0.0:8051 +thunder-lock=True + +sharedarea=1000 +sharedarea=1000 \ No newline at end of file diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 10fb8646..8a82e569 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -11,6 +11,14 @@ import struct import time import types +import uuid +from logging import getLogger +from contextlib import contextmanager + +try: + import uwsgi +except ImportError: + uwsgi = None try: from BaseHTTPServer import BaseHTTPRequestHandler @@ -31,6 +39,11 @@ _INITIAL_MMAP_SIZE = 1024*1024 +logger = getLogger('prometheus_client') + +class InvalidUWSGISharedareaPagesize(Exception): + pass + class CollectorRegistry(object): '''Metric collector registry. @@ -93,6 +106,7 @@ def collect(self): collectors = None with self._lock: collectors = copy.copy(self._collector_to_names) + for collector in collectors: for metric in collector.collect(): yield metric @@ -309,6 +323,339 @@ def get(self): with self._lock: return self._value + + +class _UWSGISharedareaDict(object): + """A dict of doubles, backend by uwsgi sharedarea + """ + + SHAREDAREA_ID = int(os.environ.get('PROMETHEUS_UWSGI_SHAREDAREA', 0)) + KEY_SIZE_SIZE = 4 + KEY_VALUE_SIZE = 8 + SIGN_SIZE = 10 + AREA_SIZE_SIZE = 4 + SIGN_POSITION = 4 + AREA_SIZE_POSITION = 0 + + def __init__(self): + self._used = None + # Changed every time then keys added + self._sign = None + self._positions = {} + self._rlocked = False + self._wlocked = False + self._m = uwsgi.sharedarea_memoryview(self.SHAREDAREA_ID) + self.init_memory() + + @property + def m(self): + return self._m + + @property + def wlocked(self): + return self._wlocked + + @wlocked.setter + def wlocked(self, value): + self._wlocked = value + return self._wlocked + + @property + def rlocked(self): + return self._rlocked + + @rlocked.setter + def rlocked(self, value): + self._rlocked = value + return self._rlocked + + def get_area_size_with_lock(self): + with self.lock(): + return self.get_area_size() + + def get_slice(self, start, size): + return slice(start, start+size) + + def get_area_size(self): + """Read area size from uwsgi + """ + return struct.unpack(b'i', self.m[self.get_slice(self.AREA_SIZE_POSITION, self.AREA_SIZE_SIZE)])[0] + + def init_area_size(self): + return self.update_area_size(self.AREA_SIZE_SIZE) + + def update_area_size(self, size): + self._used = size + self.m[self.get_slice(self.AREA_SIZE_POSITION, self.AREA_SIZE_SIZE)] = struct.pack(b'i', size) + return True + + def update_area_sign(self): + self._sign = os.urandom(self.SIGN_SIZE) + self.m[self.get_slice(self.SIGN_POSITION, self.SIGN_SIZE)] = self._sign + + + def get_area_sign(self): + """Get current area sign from memory + """ + return self.m[self.get_slice(self.SIGN_POSITION, self.SIGN_SIZE)].tobytes() + + def init_memory(self): + """Initialize default memory addresses + """ + with self.lock(): + if self._used is None: + self._used = self.get_area_size() + + if self._used == 0: + self.update_area_sign() + self.update_area_size(self.SIGN_SIZE + self.AREA_SIZE_SIZE) + + self.validate_actuality() + + + def read_memory(self): + """Read all keys from sharedared + """ + pos = self.AREA_SIZE_POSITION + self.AREA_SIZE_SIZE + self.SIGN_SIZE + self._used = self.get_area_size() + self._sign = self.get_area_sign() + + while pos < self._used + self.AREA_SIZE_POSITION: + + key_size, (key, key_value), positions = self.read_item(pos) + yield key_size, (key, key_value), positions + pos = positions[3] + + def load_exists_positions(self): + """Load all keys from memory + """ + + self._used = self.get_area_size() + self._sign = self.get_area_sign() + + for _, (key, _), positions in self.read_memory(): + self._positions[key] = positions + + + def get_string_padding(self, key): + """Calculate string padding + + :param key: encoded string + """ + return (8 - (len(key) + 4) % 8) + + def init_key(self, key): + """Initialize memory for key + + :param key: key string + """ + encoded = key.encode('utf-8') + + # Pad to be 8-byte aligned + padding = self.get_string_padding(encoded) + key_size = len(encoded) + self.get_string_padding(encoded) + + item_template = 'i{0}sd'.format(key_size).encode() + + value = struct.pack(item_template, len(encoded), encoded, 0.0) + + key_string_position = self._used + self.AREA_SIZE_POSITION + + self.m[self.get_slice(self._used + self.AREA_SIZE_POSITION, len(value))] = value + + self.update_area_size(self._used + len(value)) + self._positions[key] = [key_string_position, key_string_position + self.KEY_SIZE_SIZE, + self._used - self.KEY_VALUE_SIZE, self._used+self.AREA_SIZE_POSITION] + self.update_area_sign() + return self._positions[key] + + def read_key_string(self, position, size): + """Read key value from position by given size + + :param position: int offset for key string + :param size: int key size in bytes to read + """ + key_string_bytes = self.m[self.get_slice(position, size)] + return struct.unpack(b'{0}s'.format(size), key_string_bytes)[0] + + def read_key_value(self, position): + """Read float value of position + + :param position: int offset for key value float + """ + key_value_bytes = self.m[self.get_slice(position, self.KEY_VALUE_SIZE)] + return struct.unpack(b'd', key_value_bytes)[0] + + def read_key_size(self, position): + """Read key size from position + + :param position: int offset for 4-byte key size + """ + key_size_bytes = self.m[self.get_slice(position, self.KEY_SIZE_SIZE)] + return struct.unpack(b'i', key_size_bytes)[0] + + def write_key_value(self, position, value): + """Write float value to position + + :param position: int offset for 8-byte float value + """ + self.m[self.get_slice(position, self.KEY_VALUE_SIZE)] = struct.pack(b'd', value) + return True + + def read_item(self, position): + """Read key info from given position + + 4 bytes int key size + n bytes key value of utf-8 encoded string key padding to a 8 byte + 8 bytes float counter value + """ + + key_size = self.read_key_size(position) + + key_string_position = position + self.KEY_SIZE_SIZE + key = self.read_key_string(key_string_position, key_size) + + key_value_position = key_string_position + self.get_string_padding(key.encode('utf-8')) + key_size + + key_value = self.read_key_value(key_value_position) + + return (key_size, + (key, key_value), + (position, key_string_position, + key_value_position, key_value_position + self.KEY_VALUE_SIZE)) + + def get_key_position(self, key): + return self._positions.get(key, None) or self.init_key(key) + + def inc_value(self, key, value): + """Increase/decrease key value + + :param key: key string + :param value: key value + """ + with self.lock(): + try: + self.validate_actuality() + position = self.get_key_position(key)[2] + return self.write_key_value(position, self.read_key_value(position) + value) + except InvalidUWSGISharedareaPagesize as e: + logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m))) + return 0 + + def write_value(self, key, value): + """Write value to shared memory + + :param key: key string + :param value: key value + """ + with self.lock(): + try: + self.validate_actuality() + return self.write_key_value(self.get_key_position(key)[2], value) + except InvalidUWSGISharedareaPagesize as e: + logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m))) + return None + + def read_value(self, key): + """Read value from shared memory + + :param key: key string + """ + with self.lock(): + try: + self.validate_actuality() + return self.read_key_value(self.get_key_position(key)[2]) + except InvalidUWSGISharedareaPagesize: + logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m))) + return 0 + + def validate_actuality(self): + """For prevent data corruption + + Reload data from sharedmemory into process if sign changed + """ + if self._sign != self.get_area_sign(): + self.load_exists_positions() + + @contextmanager + def lock(self): + lock_id = uuid.uuid4().hex + if not self.wlocked and not self.rlocked: + self.wlocked, self.rlocked = lock_id, lock_id + uwsgi.sharedarea_wlock(self.SHAREDAREA_ID) + yield + uwsgi.sharedarea_unlock(self.SHAREDAREA_ID) + self.wlocked, self.rlocked = False, False + else: + yield + + @contextmanager + def rlock(self): + lock_id = uuid.uuid4().hex + if not self.rlocked: + self.rlocked = lock_id + uwsgi.sharedarea_rlock(self.SHAREDAREA_ID) + yield + uwsgi.sharedarea_unlock(self.SHAREDAREA_ID) + self.rlocked = False + else: + yield + + def unlock(self): + self._wlocked, self._rlocked = False, False + uwsgi.sharedarea_unlock(self.SHAREDAREA_ID) + + +def _UWSGISharedareaInit(): + KEYS_CACHE = {} + + STORAGE = _UWSGISharedareaDict() + + class _UWSGISharedareaValue(object): + """ + A float protected by uwsgi sharedarea pages + """ + _multiprocess = False + + def __init__(self, metric_type, metric_name, name, labelnames, labelvalues, **kwargs): + self._metric_type = metric_type + self._metric_name = metric_name + self._name = name + self._labelnames = labelnames + self._labelvalues = labelvalues + self._key = self._get_key((metric_name, name, tuple(labelnames), tuple(labelvalues))) + self._storage = STORAGE + + def inc(self, amount): + """increase key value + + :param amount: + """ + self._storage.inc_value(self._key, amount) + + def set(self, value): + """Set value for key + + :param value: + """ + return self._storage.write_value(self._key, value) + + def get(self): + """Get key value + """ + return self._storage.read_value(self._key) + + def _get_key(self, values): + try: + return KEYS_CACHE[values] + except KeyError: + pass + KEYS_CACHE[values] = json.dumps(values) + return KEYS_CACHE[values] + + return _UWSGISharedareaValue + + class _MmapedDict(object): """A dict of doubles, backed by an mmapped file. @@ -339,6 +686,9 @@ def _init_value(self, key): """Initilize a value. Lock must be held by caller.""" encoded = key.encode('utf-8') # Pad to be 8-byte aligned. + # pseudo-code, see actual code below + # padding = (align - (offset mod align)) mod align + # new offset = offset + padding = offset + (align - (offset mod align)) mod align padded = encoded + (b' ' * (8 - (len(encoded) + 4) % 8)) value = struct.pack('i{0}sd'.format(len(padded)).encode(), len(encoded), padded, 0.0) while self._used + len(value) > self._capacity: @@ -392,6 +742,7 @@ def close(self): self._f = None + def _MultiProcessValue(__pid=os.getpid()): pid = __pid files = {} @@ -438,7 +789,11 @@ def get(self): # This needs to be chosen before the first metric is constructed, # and as that may be in some arbitrary library the user/admin has # no control over we use an enviroment variable. -if 'prometheus_multiproc_dir' in os.environ: +if "PROMETHEUS_UWSGI_SHAREDAREA" in os.environ: + if not uwsgi: + raise RuntimeError("uwsgi not found") + _ValueClass = _UWSGISharedareaInit() +elif 'prometheus_multiproc_dir' in os.environ: _ValueClass = _MultiProcessValue() else: _ValueClass = _MutexValue diff --git a/setup.py b/setup.py index ac41ede7..710c29b9 100644 --- a/setup.py +++ b/setup.py @@ -2,8 +2,8 @@ from setuptools import setup setup( - name = "prometheus_client", - version = "0.0.18", + name = "ra_prometheus_client", + version = "0.1.0", author = "Brian Brazil", author_email = "brian.brazil@robustperception.io", description = ("Python client for the Prometheus monitoring system."), @@ -11,7 +11,9 @@ license = "Apache Software License 2.0", keywords = "prometheus monitoring instrumentation client", url = "https://github.com/prometheus/client_python", - packages=['prometheus_client', 'prometheus_client.bridge', 'prometheus_client.twisted'], + packages=['prometheus_client', + 'prometheus_client.bridge', + 'prometheus_client.twisted'], extras_require={ 'twisted': ['twisted'], }, @@ -24,7 +26,7 @@ "Programming Language :: Python", "Programming Language :: Python :: 2", "Programming Language :: Python :: 2.6", - "Programming Language :: Python :: 2.7", +"Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5",