8000 add API Gateway SQS proxy (#1354) · sharp-bits/localstack@96041e9 · GitHub
[go: up one dir, main page]

Skip to content

Commit 96041e9

Browse files
Mathew Powerwhummer
authored andcommitted
add API Gateway SQS proxy (localstack#1354)
1 parent b918f32 commit 96041e9

File tree

4 files changed

+82
-2
lines changed

4 files changed

+82
-2
lines changed

localstack/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
APPLICATION_AMZ_JSON_1_0 = 'application/x-amz-json-1.0'
7070
APPLICATION_AMZ_JSON_1_1 = 'application/x-amz-json-1.1'
7171
APPLICATION_JSON = 'application/json'
72+
APPLICATION_X_WWW_FORM_URLENCODED = 'application/x-www-form-urlencoded'
7273

7374
# Lambda defaults
7475
LAMBDA_TEST_ROLE = 'arn:aws:iam::%s:role/lambda-test-role' % TEST_AWS_ACCOUNT_ID

localstack/services/apigateway/apigateway_listener.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import re
22
import logging
33
import json
4+
from six.moves.urllib_parse import urljoin
5+
46
import requests
57
from requests.models import Response
68
from flask import Response as FlaskResponse
79
from localstack.constants import APPLICATION_JSON, PATH_USER_REQUEST
8-
from localstack.config import TEST_KINESIS_URL
10+
from localstack.config import TEST_KINESIS_URL, TEST_SQS_URL
911
from localstack.utils import common
1012
from localstack.utils.aws import aws_stack
1113
from localstack.utils.common import to_str
@@ -71,6 +73,19 @@ def forward_request(self, method, path, data, headers):
7173
result = common.make_http_request(url=TEST_KINESIS_URL,
7274
method='POST', data=new_request, headers=headers)
7375
return result
76+
77+
elif uri.startswith('arn:aws:apigateway:') and ':sqs:path' in uri:
78+
template = integration['requestTemplates'][APPLICATION_JSON]
79+
account_id, queue = uri.split('/')[-2:]
80+
81+
new_request = aws_stack.render_velocity_template(template, data) + '&QueueName=%s' % queue
82+
83+
headers = aws_stack.mock_aws_request_headers(service='sqs')
84+
85+
url = urljoin(TEST_SQS_URL, '%s/%s?%s' % (account_id, queue, new_request))
86+
result = common.make_http_request(url, method='POST', headers=headers)
87+
return result
88+
7489
else:
7590
msg = 'API Gateway action uri "%s" not yet implemented' % uri
7691
LOGGER.warning(msg)

localstack/utils/aws/aws_stack.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
import six
99
from localstack import config
1010
from localstack.constants import (REGION_LOCAL, DEFAULT_REGION, LOCALHOST, MOTO_ACCOUNT_ID,
11-
ENV_DEV, APPLICATION_AMZ_JSON_1_1, APPLICATION_AMZ_JSON_1_0, TEST_AWS_ACCOUNT_ID)
11+
ENV_DEV, APPLICATION_AMZ_JSON_1_1, APPLICATION_AMZ_JSON_1_0,
12+
APPLICATION_X_WWW_FORM_URLENCODED, TEST_AWS_ACCOUNT_ID)
1213
from localstack.utils.common import (
1314
run_safe, to_str, is_string, is_string_or_bytes, make_http_request,
1415
timestamp, is_port_open, get_service_protocol)
@@ -368,6 +369,13 @@ def s3_bucket_arn(bucket_name, account_id=None):
368369
return 'arn:aws:s3:::%s' % (bucket_name)
369370

370371

372+
def create_sqs_queue(queue_name, env=None):
373+
env = get_environment(env)
374+
# queue
375+
conn = connect_to_service('sqs', env=env)
376+
return conn.create_queue(QueueName=queue_name)
377+
378+
371379
def sqs_queue_arn(queue_name, account_id=None):
372380
account_id = get_account_id(account_id)
373381
# ElasticMQ sets a static region of "elasticmq"
@@ -385,10 +393,18 @@ def get_sqs_queue_url(queue_name):
385393
return response['QueueUrl']
386394

387395

396+
def sqs_receive_message(queue_name):
397+
client = connect_to_service('sqs')
398+
response = client.receive_message(QueueUrl=get_sqs_queue_url(queue_name))
399+
return response
400+
401+
388402
def mock_aws_request_headers(service='dynamodb'):
389403
ctype = APPLICATION_AMZ_JSON_1_0
390404
if service == 'kinesis':
391405
ctype = APPLICATION_AMZ_JSON_1_1
406+
elif service == 'sqs':
407+
ctype = APPLICATION_X_WWW_FORM_URLENCODED
392408
access_key = get_boto3_credentials().access_key
393409
headers = {
394410
'Content-Type': ctype,

tests/integration/test_api_gateway.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import base64
12
import re
23
import json
34
import unittest
@@ -34,6 +35,7 @@ class TestAPIGatewayIntegrations(unittest.TestCase):
3435
#end
3536
]
3637
}"""
38+
APIGATEWAY_SQS_DATA_INBOUND_TEMPLATE = "Action=SendMessage&MessageBody=$util.base64Encode($input.json('$'))"
3739

3840
# endpoint paths
3941
API_PATH_DATA_INBOUND = '/data'
@@ -46,6 +48,7 @@ class TestAPIGatewayIntegrations(unittest.TestCase):
4648

4749
# name of Kinesis stream connected to API Gateway
4850
TEST_STREAM_KINESIS_API_GW = 'test-stream-api-gw'
51+
TEST_SQS_QUEUE = 'test-sqs-queue-api-gw'
4952
TEST_STAGE_NAME = 'testing'
5053
TEST_LAMBDA_PROXY_BACKEND = 'test_lambda_apigw_backend'
5154
TEST_LAMBDA_PROXY_BACKEND_WITH_PATH_PARAM = 'test_lambda_apigw_backend_path_param'
@@ -77,6 +80,28 @@ def test_api_gateway_kinesis_integration(self):
7780
self.assertEqual(result['FailedRecordCount'], 0)
7881
self.assertEqual(len(result['Records']), len(test_data['records']))
7982

83+
def test_api_gateway_sqs_integration(self):
84+
# create target SQS stream
85+
aws_stack.create_sqs_queue(self.TEST_SQS_QUEUE)
86+
87+
# create API Gateway and connect it to the target queue
88+
result = self.connect_api_gateway_to_sqs('test_gateway4')
89+
90+
# generate test data
91+
test_data = {'spam': 'eggs'}
92+
93+
url = INBOUND_GATEWAY_URL_PATTERN.format(
94+
api_id=result['id'],
95+
stage_name=self.TEST_STAGE_NAME,
96+
path=self.API_PATH_DATA_INBOUND
97+
)
98+
result = requests.post(url, data=json.dumps(test_data))
99+
self.assertEqual(result.status_code, 200)
100+
101+
messages = aws_stack.sqs_receive_message(self.TEST_SQS_QUEUE)['Messages']
102+
self.assertEqual(len(messages), 1)
103+
self.assertEqual(json.loads(base64.b64decode(messages[0]['Body'])), test_data)
104+
80105
def test_api_gateway_http_integration(self):
81106
test_port = 12123
82107
backend_url = 'http://localhost:%s%s' % (test_port, self.API_PATH_HTTP_BACKEND)
@@ -279,6 +304,29 @@ def connect_api_gateway_to_kinesis(self, gateway_name, kinesis_stream):
279304
stage_name=self.TEST_STAGE_NAME
280305
)
281306

307+
def connect_api_gateway_to_sqs(self, gateway_name):
308+
resources = {}
309+
template = self.APIGATEWAY_SQS_DATA_INBOUND_TEMPLATE
310+
resource_path = self.API_PATH_DATA_INBOUND.replace('/', '')
311+
resources[resource_path] = [{
312+
'httpMethod': 'POST',
313+
'authorizationType': 'NONE',
314+
'integrations': [{
315+
'type': 'AWS',
316+
'uri': 'arn:aws:apigateway:%s:sqs:path/%s/%s' % (
317+
DEFAULT_REGION, TEST_AWS_ACCOUNT_ID, self.TEST_SQS_QUEUE
318+
),
319+
'requestTemplates': {
320+
'application/json': template
321+
},
322+
}]
323+
}]
324+
return aws_stack.create_api_gateway(
325+
name=gateway_name,
326+
resources=resources,
327+
stage_name=self.TEST_STAGE_NAME
328+
)
329+
282330
def connect_api_gateway_to_http(self, gateway_name, target_url, methods=[], path=None):
283331
if not methods:
284332
methods = ['GET', 'POST']

0 commit comments

Comments
 (0)
0