|
1 | 1 | import base64
|
| 2 | +import json |
2 | 3 | import logging
|
3 | 4 | import os
|
4 | 5 | import re
|
|
7 | 8 | from collections import defaultdict
|
8 | 9 | from functools import lru_cache
|
9 | 10 | from io import BytesIO
|
10 |
| -from typing import Any, Dict, List, Optional, Union |
| 11 | +from typing import Any, Dict, List, Optional, TypedDict, Union |
11 | 12 |
|
12 | 13 | from flask import Response
|
13 | 14 |
|
@@ -300,3 +301,143 @@ def generate_lambda_arn(
|
300 | 301 | return f"arn:aws:lambda:{region}:{account_id}:function:{fn_name}:{qualifier}"
|
301 | 302 | else:
|
302 | 303 | return f"arn:aws:lambda:{region}:{account_id}:function:{fn_name}"
|
| 304 | + |
| 305 | + |
| 306 | +class FilterCriteria(TypedDict): |
| 307 | + Filters: List[Dict[str, any]] |
| 308 | + |
| 309 | + |
| 310 | +def parse_and_apply_numeric_filter(record_value: Dict, numeric_filter: List[str | int]) -> bool: |
| 311 | + if len(numeric_filter) % 2 > 0: |
| 312 | + LOG.warn("Invalid numeric lambda filter given") |
| 313 | + return True |
| 314 | + |
| 315 | + if not isinstance(record_value, (int, float)): |
| 316 | + LOG.warn(f"Record {record_value} seem not to be a valid number") |
| 317 | + return False |
| 318 | + |
| 319 | + for idx in range(0, len(numeric_filter), 2): |
| 320 | + |
| 321 | + try: |
| 322 | + if numeric_filter[idx] == ">" and not (record_value > float(numeric_filter[idx + 1])): |
| 323 | + return False |
| 324 | + if numeric_filter[idx] == ">=" and not (record_value >= float(numeric_filter[idx + 1])): |
| 325 | + return False |
| 326 | + if numeric_filter[idx] == "=" and not (record_value == float(numeric_filter[idx + 1])): |
| 327 | + return False |
| 328 | + if numeric_filter[idx] == "<" and not (record_value < float(numeric_filter[idx + 1])): |
| 329 | + return False |
| 330 | + if numeric_filter[idx] == "<=" and not (record_value <= float(numeric_filter[idx + 1])): |
| 331 | + return False |
| 332 | + except ValueError: |
| 333 | + LOG.warn( |
| 334 | + f"Could not convert filter value {numeric_filter[idx + 1]} to a valid number value for filtering" |
| 335 | + ) |
| 336 | + return True |
| 337 | + |
| 338 | + |
| 339 | +def verify_dict_filter(record_value: any, dict_filter: Dict[str, any]) -> bool: |
| 340 | + # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax |
| 341 | + fits_filter = False |
| 342 | + for key, filter_value in dict_filter.items(): |
| 343 | + if key.lower() == "anything-but": |
| 344 | + fits_filter = record_value not in filter_value |
| 345 | + elif key.lower() == "numeric": |
| 346 | + fits_filter = parse_and_apply_numeric_filter(record_value, filter_value) |
| 347 | + elif key.lower() == "exists": |
| 348 | + fits_filter = bool(filter_value) # exists means that the key exists in the event record |
| 349 | + elif key.lower() == "prefix": |
| 350 | + if not isinstance(record_value, str): |
| 351 | + LOG.warn(f"Record Value {record_value} does not seem to be a valid string.") |
| 352 | + fits_filter = isinstance(record_value, str) and record_value.startswith( |
| 353 | + str(filter_value) |
| 354 | + ) |
| 355 | + |
| 356 | + if fits_filter: |
| 357 | + return True |
| 358 | + return fits_filter |
| 359 | + |
| 360 | + |
| <
10000
td data-grid-cell-id="diff-da118a05c30e47b0a45a82347563f0de41763281781993ab9c9185cceafe949e-302-361-1" data-selected="false" role="gridcell" style="background-color:var(--diffBlob-additionNum-bgColor, var(--diffBlob-addition-bgColor-num));text-align:center" tabindex="-1" valign="top" class="focusable-grid-cell diff-line-number position-relative left-side">361
+def filter_stream_record(filter_rule: Dict[str, any], record: Dict[str, any]) -> bool: |
| 362 | + if not filter_rule: |
| 363 | + return True |
| 364 | + # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax |
| 365 | + filter_results = [] |
| 366 | + for key, value in filter_rule.items(): |
| 367 | + # check if rule exists in event |
| 368 | + record_value = ( |
| 369 | + record.get(key.lower(), record.get(key)) if isinstance(record, Dict) else None |
| 370 | + ) |
| 371 | + append_record = False |
| 372 | + if record_value is not None: |
| 373 | + # check if filter rule value is a list (leaf of rule tree) or a dict (rescursively call function) |
| 374 | + if isinstance(value, list): |
| 375 | + if len(value) > 0: |
| 376 | + if isinstance(value[0], (str, int)): |
| 377 | + append_record = record_value in value |
| 378 | + if isinstance(value[0], dict): |
| 379 | + append_record = verify_dict_filter(record_value, value[0]) |
| 380 | + else: |
| 381 | + LOG.warn(f"Empty lambda filter: {key}") |
| 382 | + elif isinstance(value, dict): |
| 383 | + append_record = filter_stream_record(value, record_value) |
| 384 | + else: |
| 385 | + # special case 'exists' |
| 386 | + if isinstance(value, list) and len(value) > 0: |
| 387 | + append_record = not value[0].get("exists", True) |
| 388 | + |
| 389 | + filter_results.append(append_record) |
| 390 | + return all(filter_results) |
| 391 | + |
| 392 | + |
| 393 | +def filter_stream_records(records, filters: List[FilterCriteria]): |
| 394 | + filtered_records = [] |
| 395 | + for record in records: |
| 396 | + for filter in filters: |
| 397 | + for rule in filter["Filters"]: |
| 398 | + if filter_stream_record(json.loads(rule["Pattern"]), record): |
| 399 | + filtered_records.append(record) |
| 400 | + break |
| 401 | + return filtered_records |
| 402 | + |
| 403 | + |
| 404 | +def contains_list(filter: Dict) -> bool: |
| 405 | + if isinstance(filter, dict): |
| 406 | + for key, value in filter.items(): |
| 407 | + if isinstance(value, list) and len(value) > 0: |
| 408 | + return True |
| 409 | + return contains_list(value) |
| 410 | + return False |
| 411 | + |
| 412 | + |
| 413 | +def validate_filters(filter: FilterCriteria) -> bool: |
| 414 | + # filter needs to be json serializeable |
| 415 | + for rule in filter["Filters"]: |
| 416 | + try: |
| 417 | + if not (filter_pattern := json.loads(rule["Pattern"])): |
| 418 | + return False |
| 419 | + return contains_list(filter_pattern) |
| 420 | + except json.JSONDecodeError: |
| 421 | + return False |
| 422 | + # needs to contain on what to filter (some list with citerias) |
| 423 | + # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax |
| 424 | + |
| 425 | + return True |
| 426 | + |
| 427 | + |
| 428 | +def get_lambda_event_filters_for_arn(lambda_arn: str, event_arn: str) -> List[Dict]: |
| 429 | + # late import to avoid circular import |
| 430 | + from localstack.services.awslambda.lambda_api import LambdaRegion |
| 431 | + |
| 432 | + region_name = lambda_arn.split(":")[3] |
| 433 | + region = LambdaRegion.get(region_name) |
| 434 | + |
| 435 | + event_filter_criterias = [ |
| 436 | + event_source_mapping.get("FilterCriteria") |
| 437 | + for event_source_mapping in region.event_source_mappings |
| 438 | + if event_source_mapping.get("FunctionArn") == lambda_arn |
| 439 | + and event_source_mapping.get("EventSourceArn") == event_arn |
| 440 | + and event_source_mapping.get("FilterCriteria") is not None |
| 441 | + ] |
| 442 | + |
| 443 | + return event_filter_criterias |
0 commit comments