8000 initial support for put_metric_filter for CloudWatch log events · riddopic/localstack@2e7b26d · GitHub
[go: up one dir, main page]

Skip to content

Commit 2e7b26d

Browse files
committed
initial support for put_metric_filter for CloudWatch log events
1 parent b1dc51b commit 2e7b26d

File tree

4 files changed

+159
-30
lines changed

4 files changed

+159
-30
lines changed

localstack/services/logs/logs_listener.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,27 @@
11
import re
2+
import json
3+
import logging
24
from requests.models import Request
3-
from localstack.utils.common import to_str
5+
from localstack.utils.aws import aws_stack
6+
from localstack.utils.common import to_str, is_number
47
from localstack.constants import APPLICATION_AMZ_JSON_1_1, TEST_AWS_ACCOUNT_ID
58
from localstack.services.generic_proxy import ProxyListener
69

10+
LOG = logging.getLogger(__name__)
11+
712

813
class ProxyListenerCloudWatchLogs(ProxyListener):
914
def forward_request(self, method, path, data, headers):
15+
action = headers.get('X-Amz-Target') or ''
16+
action = action.split('.')[-1]
17+
18+
if action == 'PutLogEvents':
19+
publish_log_metrics_for_events(data)
1020
if method == 'POST' and path == '/':
1121
if 'nextToken' in to_str(data or ''):
1222
data = self._fix_next_token_request(data)
1323
headers['Content-Length'] = str(len(data))
1424
return Request(data=data, headers=headers, method=method)
15-
1625
return True
1726

1827
def return_response(self, method, path, data, headers, response):
@@ -43,5 +52,41 @@ def _fix_next_token_response(response):
4352
response._content = re.sub(pattern, replacement, to_str(response.content))
4453

4554

55+
def publish_log_metrics_for_events(data):
56+
""" Filter and publish log metrics for matching events """
57+
from moto.logs.models import logs_backends # TODO: create separate RegionBackend class to store state
58+
data = data if isinstance(data, dict) else json.loads(data)
59+
log_events = data.get('logEvents') or []
60+
logs_backend = logs_backends[aws_stack.get_region()]
61+
metric_filters = logs_backend.metric_filters = getattr(logs_backend, 'metric_filters', [])
62+
client = aws_stack.connect_to_service('cloudwatch')
63+
for metric_filter in metric_filters:
64+
pattern = metric_filter.get('filterPattern', '')
65+
if log_events_match_filter_pattern(pattern, log_events):
66+
for tf in metric_filter.get('metricTransformations', []):
67+
value = tf.get('metricValue') or '1'
68+
if '$size' in value:
69+
LOG.info('Expression not yet supported for log filter metricValue: %s' % value)
70+
value = float(value) if is_number(value) else 1
71+
data = [{'MetricName': tf['metricName'], 'Value': value}]
72+
try:
73+
client.put_metric_data(Namespace=tf['metricNamespace'], MetricData=data)
74+
except Exception as e:
75+
LOG.info('Unable to put metric data for matching CloudWatch log events: %s' % e)
76+
77+
78+
def log_events_match_filter_pattern(filter_pattern, log_events):
79+
def matches(event):
80+
# TODO: implement full support for filter pattern expressions:
81+
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
82+
return re.match(filter_pattern, event.get('message') or '')
83+
filter_pattern = (filter_pattern or '').strip() or '*'
84+
filter_pattern = filter_pattern.replace('*', '.*')
85+
log_events = log_events if isinstance(log_events, list) else [log_events]
86+
for event in log_events:
87+
if matches(event):
88+
return True
89+
90+
4691
# instantiate listener
4792
UPDATE_LOGS = ProxyListenerCloudWatchLogs()

localstack/services/logs/logs_starter.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,11 @@
55
from localstack import config
66
from localstack.utils.aws import aws_stack
77
from moto.awslambda import models as lambda_models
8-
from moto.logs import models as logs_models
9-
from moto.logs.models import LogStream
8+
from moto.logs import models as logs_models, responses as logs_responses
109
from moto.core.utils import unix_time_millis
10+
from moto.logs.models import LogStream
11+
from moto.logs.exceptions import ResourceNotFoundException, InvalidParameterException
1112
from localstack.services.infra import start_moto_server
12-
from moto.logs.exceptions import (
13-
ResourceNotFoundException,
14-
InvalidParameterException,
15-
)
1613

1714

1815
def patch_lambda():
@@ -146,6 +143,38 @@ def put_log_events_model(self, log_group_name, log_stream_name, log_events, sequ
146143
for logs_backend in logs_models.logs_backends.values():
147144
logs_backend.put_subscription_filter = patch_put_subscription_filter(logs_backend)
148145

146+
def put_metric_filter(self):
147+
data = dict(self.request_params)
148+
metric_filters = self.logs_backend.metric_filters = getattr(self.logs_backend, 'metric_filters', [])
149+
metric_filters.append(data)
150+
return json.dumps({})
151+
152+
if not hasattr(logs_responses.LogsResponse, 'put_metric_filter'):
153+
logs_responses.LogsResponse.put_metric_filter = put_metric_filter
154+
155+
def describe_metric_filters(self):
156+
log_group_name = self._get_param('logGroupName')
157+
name_prefix = self._get_param('filterNamePrefix') or ''
158+
metric_filters = self.logs_backend.metric_filters = getattr(self.logs_backend, 'metric_filters', [])
159+
metric_filters = [mf for mf in metric_filters if log_group_name in (None, mf['logGroupName'])]
160+
metric_filters = [mf for mf in metric_filters if mf['filterName'].startswith(name_prefix)]
161+
result = {'metricFilters': metric_filters}
162+
return json.dumps(result)
163+
164+
if not hasattr(logs_responses.LogsResponse, 'describe_metric_filters'):
165+
logs_responses.LogsResponse.describe_metric_filters = describe_metric_filters
166+
167+
def delete_metric_filter(self):
168+
log_group_name = self._get_param('logGroupName')
169+
filter_name = self._get_param('filterName')
170+
metric_filters = self.logs_backend.metric_filters = getattr(self.logs_backend, 'metric_filters', [])
171+
self.logs_backend.metric_filters = [mf for mf in metric_filters
172+
if mf['filterName'] != filter_name or mf['logGroupName'] != log_group_name]
173+
return json.dumps({})
174+
175+
if not hasattr(logs_responses.LogsResponse, 'delete_metric_filter'):
176+
logs_responses.LogsResponse.delete_metric_filter = delete_metric_filter
177+
149178

150179
def start_cloudwatch_logs(port=None, asynchronous=False, update_listener=None):
151180
port = port or config.PORT_LOGS

tests/integration/test_logs.py

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22
import unittest
3+
from datetime import datetime, timedelta
34
from localstack.constants import APPLICATION_AMZ_JSON_1_1
45
from localstack.utils.aws import aws_stack
56
from localstack.utils import testutil
@@ -19,15 +20,11 @@ def test_put_events_multi_bytes_msg(self):
1920

2021
groups_before = len(self.logs_client.describe_log_groups()['logGroups'])
2122

22-
response = self.logs_client.create_log_group(logGroupName=group)
23-
self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)
23+
response = self.create_log_group_and_stream(group, stream)
2424

2525
groups_after = len(self.logs_client.describe_log_groups()['logGroups'])
2626
self.assertEqual(groups_after, groups_before + 1)
2727

28-
response = self.logs_client.create_log_stream(logGroupName=group, logStreamName=stream)
29-
self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)
30-
3128
# send message with non-ASCII (multi-byte) chars
3229
body_msg = '🙀 - 参よ - 日本語'
3330
events = [{
@@ -49,8 +46,7 @@ def test_filter_log_events_response_header(self):
4946
group = 'lg-%s' % short_uid()
5047
stream = 'ls-%s' % short_uid()
5148

52-
self.logs_client.create_log_group(logGroupName=group)
53-
self.logs_client.create_log_stream(logGroupName=group, logStreamName=stream)
49+
self.create_log_group_and_stream(group, stream)
5450

5551
events = [
5652
{'timestamp': 1585902800, 'message': 'log message 1'},
@@ -78,7 +74,6 @@ def test_list_tags_log_group(self):
7874
'env': 'testing1'
7975
}
8076
)
81-
8277
rs = self.logs_client.list_tags_log_group(
8378
logGroupName=group
8479
)
@@ -150,13 +145,7 @@ def test_put_subscription_filter_firehose(self):
150145
)
151146
firehose_arn = response['DeliveryStreamARN']
152147

153-
self.logs_client.create_log_group(
154-
logGroupName=log_group
155-
)
156-
self.logs_client.create_log_stream(
157-
logGroupName=log_group,
158-
logStreamName=log_stream
159-
)
148+
self.create_log_group_and_stream(log_group, log_stream)
160149

161150
self.logs_client.put_subscription_filter(
162151
logGroupName=log_group,
@@ -208,13 +197,7 @@ def test_put_subscription_filter_kinesis(self):
208197

209198
kinesis_client = aws_stack.connect_to_service('kinesis')
210199

211-
self.logs_client.create_log_group(
212-
logGroupName=log_group
213-
)
214-
self.logs_client.create_log_stream(
215-
logGroupName=log_group,
216-
logStreamName=log_stream
217-
)
200+
self.create_log_group_and_stream(log_group, log_stream)
218201

219202
kinesis_client.create_stream(
220203
StreamName=kinesis,
@@ -267,3 +250,59 @@ def put_event():
267250
StreamName=kinesis,
268251
EnforceConsumerDeletion=True
269252
)
253+
254+
def test_metric_filters(self):
255+
log_group = 'g-%s' % short_uid()
256+
log_stream = 's-%s' % short_uid()
257+
filter_name = 'f-%s' % short_uid()
258+
metric_ns = 'ns-%s' % short_uid()
259+
metric_name = 'metric1'
260+
transforms = {
261+
'metricNamespace': metric_ns,
262+
'metricName': metric_name,
263+
'metricValue': '1',
264+
'defaultValue': 123
265+
}
266+
result = self.logs_client.put_metric_filter(logGroupName=log_group,
267+
filterName=filter_name, filterPattern='*', metricTransformations=[transforms])
268+
self.assertEqual(result['ResponseMetadata']['HTTPStatusCode'], 200)
269+
270+
result = self.logs_client.describe_metric_filters(logGroupName=log_group, filterNamePrefix='f-')
271+
self.assertEqual(result['ResponseMetadata']['HTTPStatusCode'], 200)
272+
result = [mf for mf in result['metricFilters'] if mf['filterName'] == filter_name]
273+
self.assertEqual(len(result), 1)
274+
275+
# put log events and assert metrics being published
276+
events = [
277+
{'timestamp': 1585902800, 'message': 'log message 1'},
278+
{'timestamp': 1585902961, 'message': 'log message 2'}
279+
]
280+
self.create_log_group_and_stream(log_group, log_stream)
281+
self.logs_client.put_log_events(logGroupName=log_group, logStreamName=log_stream, logEvents=events)
282+
283+
# Get metric data
284+
cw_client = aws_stack.connect_to_service('cloudwatch')
285+
metric_data = cw_client.get_metric_data(
286+
MetricDataQueries=[{'Id': 'q1', 'MetricStat': {'Metric':
287+
{'Namespace': metric_ns, 'MetricName': metric_name}, 'Period': 60, 'Stat': 'Sum'}}],
288+
StartTime=datetime.utcnow() - timedelta(hours=1),
289+
EndTime=datetime.utcnow(),
290+
)['MetricDataResults']
291+
self.assertEquals(len(metric_data), 1)
292+
self.assertEquals(metric_data[0]['Values'], [1])
293+
self.assertEquals(metric_data[0]['StatusCode'], 'Complete')
294+
295+
# delete filters
296+
result = self.logs_client.delete_metric_filter(logGroupName=log_group, filterName=filter_name)
297+
self.assertEqual(result['ResponseMetadata']['HTTPStatusCode'], 200)
298+
299+
result = self.logs_client.describe_metric_filters(logGroupName=log_group, filterNamePrefix='f-')
300+
self.assertEqual(result['ResponseMetadata']['HTTPStatusCode'], 200)
301+
result = [mf for mf in result['metricFilters'] if mf['filterName'] == filter_name]
302+
self.assertEqual(len(result), 0)
303+
304+
def create_log_group_and_stream(self, log_group, log_stream):
305+
response = self.logs_client.create_log_group(logGroupName=log_group)
306+
self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)
307+
response = self.logs_client.create_log_stream(logGroupName=log_group, logStreamName=log_stream)
308+
self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)

tests/unit/test_logs.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import unittest
2+
from localstack.services.logs.logs_listener import log_events_match_filter_pattern
3+
4+
5+
class CloudWatchLogsTest(unittest.TestCase):
6+
7+
def test_filter_expressions(self):
8+
9+
def assert_match(pattern, log_events, expected):
10+
result = log_events_match_filter_pattern(pattern, log_events)
11+
self.assertTrue(result) if expected else self.assertFalse(result)
12+
13+
log_events = [{'message': 'test123'}, {'message': 'foo bar 456'}]
14+
assert_match('*', log_events, True)
15+
assert_match('', log_events, True)
16+
assert_match('INVALID', log_events, False)

0 commit comments

Comments
 (0)
0