|
1 | 1 | import datetime
|
2 | 2 | import math
|
| 3 | +import threading |
3 | 4 | from collections import defaultdict
|
4 | 5 | from itertools import count
|
5 | 6 | from typing import Any
|
@@ -49,6 +50,63 @@ def aggregate(self) -> dict:
|
49 | 50 | return self.state
|
50 | 51 |
|
51 | 52 |
|
| 53 | +class UsageMultiSetCounter: |
| 54 | + """ |
| 55 | + Use this counter to count occurrences of unique values for multiple dimensions. |
| 56 | + This dynamically creates UsageSetCounters and should be used with care (i.e., with limited keys). |
| 57 | + Example: |
| 58 | + my_feature_counter = UsageMultiSetCounter("pipes:invocation") |
| 59 | + my_feature_counter.record("aws:sqs", "aws:lambda") |
| 60 | + my_feature_counter.record("aws:sqs", "aws:lambda") |
| 61 | + my_feature_counter.record("aws:sqs", "aws:stepfunctions") |
| 62 | + my_feature_counter.record("aws:kinesis", "aws:lambda") |
| 63 | + aggregate is implemented for each counter individually |
| 64 | + my_feature_counter.aggregate() is available for testing purposes: |
| 65 | + { |
| 66 | + "aws:sqs": { |
| 67 | + "aws:lambda": 2, |
| 68 | + "aws:stepfunctions": 1, |
| 69 | + }, |
| 70 | + "aws:kinesis": { |
| 71 | + "aws:lambda": 1 |
| 72 |
10000
+ } |
| 73 | + } |
| 74 | + """ |
| 75 | + |
| 76 | + namespace: str |
| 77 | + _counters: dict[str, UsageSetCounter] |
| 78 | + lock = threading.Lock() |
| 79 | + |
| 80 | + def __init__(self, namespace: str): |
| 81 | + self._counters = {} |
| 82 | + self.namespace = namespace |
| 83 | + |
| 84 | + def record(self, key: str, value: str): |
| 85 | + namespace = f"{self.namespace}:{key}" |
| 86 | + if namespace in self._counters: |
| 87 | + set_counter = self._counters[namespace] |
| 88 | + else: |
| 89 | + with self.lock: |
| 90 | + if namespace in self._counters: |
| 91 | + set_counter = self._counters[namespace] |
| 92 | + else: |
| 93 | + # We cannot use setdefault here because Python always instantiates a new UsageSetCounter, |
| 94 | + # which overwrites the collector_registry |
| 95 | + set_counter = UsageSetCounter(namespace) |
| 96 | + self._counters[namespace] = set_counter |
| 97 | + |
| 98 | + self._counters[namespace] = set_counter |
| 99 | + set_counter.record(value) |
| 100 | + |
| 101 | + def aggregate(self) -> dict: |
| 102 | + """aggregate is invoked on a per UsageSetCounter level because each counter is registered individually. |
| 103 | + This utility is only for testing!""" |
| 104 | + merged_dict = {} |
| 105 | + for namespace, counter in self._counters.items(): |
| 106 | + merged_dict[namespace] = counter.aggregate() |
| 107 | + return merged_dict |
| 108 | + |
| 109 | + |
52 | 110 | class UsageCounter:
|
53 | 111 | """
|
54 | 112 | Use this counter to count numeric values
|
|
0 commit comments