10000 Merge pull request #309 from twilio/next-gen-client-validation · jstacoder/twilio-python@7b9bf77 · GitHub
[go: up one dir, main page]

Skip to content

Commit 7b9bf77

Browse files
authored
Merge pull request twilio#309 from twilio/next-gen-client-validation
ValidationClient for client validation, add ClientValidationJwt
2 parents 1fcd2c6 + f26d490 commit 7b9bf77

File tree

7 files changed

+582
-3
lines changed

7 files changed

+582
-3
lines changed

tests/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ nosexcover
66
flake8
77
mccabe
88
wheel>=0.22.0
9+
cryptography

tests/unit/http/__init__.py

Whitespace-only changes.
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import unittest
2+
3+
import mock
4+
from mock import patch, Mock
5+
from requests import Request
6+
from requests import Session
7+
8+
from twilio.http.validation_client import ValidationClient
9+
10+
11+
class TestValidationClientHelpers(unittest.TestCase):
12+
def setUp(self):
13+
self.request = Request(
14+
'GET',
15+
'https://api.twilio.com/2010-04-01/Accounts/AC123/Messages',
16+
auth=('Username', 'Password'),
17+
)
18+
self.request = self.request.prepare()
19+
self.client = ValidationClient('AC123', 'SK123', 'CR123', 'private_key')
20+
21+
def test_build_validation_payload_basic(self):
22+
validation_payload = self.client._build_validation_payload(self.request)
23+
self.assertEqual('GET', validation_payload.method)
24+
self.assertEqual('/2010-04-01/Accounts/AC123/Messages', validation_payload.path)
25+
self.assertEqual('', validation_payload.query_string)
26+
self.assertEqual(['authorization', 'host'], validation_payload.signed_headers)
27+
self.assertEqual('', validation_payload.body)
28+
29+
def test_build_validation_payload_query_string_parsed(self):
30+
self.request.url = self.request.url + '?QueryParam=1&Other=true'
31+
32+
validation_payload = self.client._build_validation_payload(self.request)
33+
34+
self.assertEqual('GET', validation_payload.method)
35+
self.assertEqual('/2010-04-01/Accounts/AC123/Messages', validation_payload.path)
36+
self.assertEqual('QueryParam=1&Other=true', validation_payload.query_string)
37+
self.assertEqual(['authorization', 'host'], validation_payload.signed_headers)
38+
self.assertEqual('', validation_payload.body)
39+
40+
def test_build_validation_payload_body_parsed(self):
41+
self.request.body = 'foobar'
42+
43+
validation_payload = self.client._build_validation_payload(self.request)
44+
45+
self.assertEqual('GET', validation_payload.method)
46+
self.assertEqual('/2010-04-01/Accounts/AC123/Messages', validation_payload.path)
47+
self.assertEqual('', validation_payload.query_string)
48+
self.assertEqual(['authorization', 'host'], validation_payload.signed_headers)
49+
self.assertEqual('foobar', validation_payload.body)
50+
51+
def test_build_validation_payload_complex(self):
52+
self.request.body = 'foobar'
53+
self.request.url = self.request.url + '?QueryParam=Value&OtherQueryParam=OtherValue'
54+
55+
validation_payload = self.client._build_validation_payload(self.request)
56+
57+
self.assertEqual('GET', validation_payload.method)
58+
self.assertEqual('/2010-04-01/Accounts/AC123/Messages', validation_payload.path)
59+
self.assertEqual(['authorization', 'host'], validation_payload.signed_headers)
60+
self.assertEqual('foobar', validation_payload.body)
61+
self.assertEqual('QueryParam=Value&OtherQueryParam=OtherValue',
62+
validation_payload.query_string)
63+
64+
def test_get_host(self):
65+
self.assertEqual('api.twilio.com', self.client._get_host(self.request))
66+
67+
68+
class TestValidationClientRequest(unittest.TestCase):
69+
70+
def setUp(self):
71+
self.session_patcher = patch('twilio.http.validation_client.Session')
72+
self.jwt_patcher = patch('twilio.http.validation_client.ClientValidationJwt')
73+
74+
self.session_mock = Mock(wraps=Session())
75+
self.validation_token = self.jwt_patcher.start()
76+
self.request_mock = Mock()
77+
78+
self.session_mock.prepare_request.return_value = self.request_mock
79+
self.session_mock.send.return_value = Mock(status_code=200, content='test')
80+
self.validation_token.return_value.to_jwt.return_value = 'test-token'
81+
self.request_mock.headers = {}
82+
83+
session_constructor_mock = self.session_patcher.start()
84+
session_constructor_mock.return_value = self.session_mock
85+
86+
self.client = ValidationClient('AC123', 'SK123', 'CR123', 'private_key')
87+
88+
def tearDown(self):
89+
self.session_patcher.stop()
90+
self.jwt_patcher.stop()
91+
92+
def test_request_does_not_overwrite_host_header(self):
93+
self.request_mock.url = 'https://api.twilio.com/'
94+
95+
self.client.request('doesnt matter', 'doesnt matter')
96+
97+
self.assertEqual('api.twilio.com', self.request_mock.headers['Host'])
98+
self.assertEqual('test-token', self.request_mock.headers['Twilio-Client-Validation'])
99+
100+
def test_request_sets_host_header_if_missing(self):
101+
self.request_mock.url = 'https://api.twilio.com/'
102+
self.request_mock.headers = {'Host': 'other.twilio.com'}
103+
104+
self.client.request('doesnt matter', 'doesnt matter')
105+
106+
self.assertEqual('other.twilio.com', self.request_mock.headers['Host'])
107+
self.assertEqual('test-token', self.request_mock.headers['Twilio-Client-Validation'])
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
import unittest
2+
import time
3+
4+
from cryptography.hazmat.backends import default_backend
5+
from cryptography.hazmat.primitives.asymmetric import rsa
6+
from cryptography.hazmat.primitives.serialization import (
7+
Encoding,
8+
PublicFormat,
9+
PrivateFormat,
10+
NoEncryption
11+
)
12+
13+
from twilio.http.validation_client import ValidationPayload
14+
from twilio.jwt import Jwt
15+
from twilio.jwt.validation import ClientValidationJwt
16+
17+
18+
class ClientValidationJwtTest(unittest.TestCase):
19+
def test_generate_payload_basic(self):
20+
vp = ValidationPayload(
21+
method='GET',
22+
path='https://api.twilio.com/',
23+
query_string='q1=v1',
24+
signed_headers=['headerb', 'headera'],
25+
all_headers={'head': 'toe', 'headera': 'vala', 'headerb': 'valb'},
26+
body='me=letop&you=leworst'
27+
)
28+
29+
expected_payload = '\n'.join([
30+
'GET',
31+
'https://api.twilio.com/',
32+
'q1=v1',
33+
'headera:vala',
34+
'headerb:valb',
35+
'',
36+
'headera;headerb',
37+
'{}'.format(ClientValidationJwt._hash('me=letop&you=leworst'))
38+
])
39+
expected_payload = ClientValidationJwt._hash(expected_payload)
40+
41+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', 'secret', vp)
42+
43+
actual_payload = jwt._generate_payload()
44+
self.assertEqual('headera;headerb', actual_payload['hrh'])
45+
self.assertEqual(expected_payload, actual_payload['rqh'])
46+
47+
def test_generate_payload_complex(self):
48+
vp = ValidationPayload(
49+
method='GET',
50+
path='https://api.twilio.com/',
51+
query_string='q1=v1&q2=v2&a=b',
52+
signed_headers=['headerb', 'headera'],
53+
all_headers={'head': 'toe', 'Headerb': 'valb', 'yeezy': 'weezy'},
54+
body='me=letop&you=leworst'
55+
)
56+
57+
expected_payload = '\n'.join([
58+
'GET',
59+
'https://api.twilio.com/',
60+
'a=b&q1=v1&q2=v2',
61+
'headerb:valb',
62+
'',
63+
'headera;headerb',
64+
'{}'.format(ClientValidationJwt._hash('me=letop&you=leworst'))
65+
])
66+
expected_payload = ClientValidationJwt._hash(expected_payload)
67+
68+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', 'secret', vp)
69+
70+
actual_payload = jwt._generate_payload()
71+
self.assertEqual('headera;headerb', actual_payload['hrh'])
72+
self.assertEqual(expected_payload, actual_payload['rqh'])
73+
74+
def test_generate_payload_no_query_string(self):
75+
vp = ValidationPayload(
76+
method='GET',
77+
path='https://api.twilio.com/',
78+
query_string='',
79+
signed_headers=['headerb', 'headera'],
80+
all_headers={'head': 'toe', 'Headerb': 'valb', 'yeezy': 'weezy'},
81+
body='me=letop&you=leworst'
82+
)
83+
84+
expected_payload = '\n'.join([
85+
'GET',
86+
'https://api.twilio.com/',
87+
'',
88+
'headerb:valb',
89+
'',
90+
'headera;headerb',
91+
'{}'.format(ClientValidationJwt._hash('me=letop&you=leworst'))
92+
])
93+
expected_payload = ClientValidationJwt._hash(expected_payload)
94+
95+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', 'secret', vp)
96+
97+
actual_payload = jwt._generate_payload()
98+
self.assertEqual('headera;headerb', actual_payload['hrh'])
99+
self.assertEqual(expected_payload, actual_payload['rqh'])
100+
101+
def test_generate_payload_no_req_body(self):
102+
vp = ValidationPayload(
103+
method='GET',
104+
path='https://api.twilio.com/',
105+
query_string='q1=v1',
106+
signed_headers=['headerb', 'headera'],
107+
all_headers={'head': 'toe', 'headera': 'vala', 'headerb': 'valb'},
108+
body=''
109+
)
110+
111+
expected_payload = '\n'.join([
112+
'GET',
113+
'https://api.twilio.com/',
114+
'q1=v1',
115 10000 +
'headera:vala',
116+
'headerb:valb',
117+
'',
118+
'headera;headerb',
119+
''
120+
])
121+
expected_payload = ClientValidationJwt._hash(expected_payload)
122+
123+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', 'secret', vp)
124+
125+
actual_payload = jwt._generate_payload()
126+
self.assertEqual('headera;headerb', actual_payload['hrh'])
127+
self.assertEqual(expected_payload, actual_payload['rqh'])
128+
129+
def test_generate_payload_header_keys_lowercased(self):
130+
vp = ValidationPayload(
131+
method='GET',
132+
path='https://api.twilio.com/',
133+
query_string='q1=v1',
134+
signed_headers=['headerb', 'headera'],
135+
all_headers={'head': 'toe', 'Headera': 'vala', 'Headerb': 'valb'},
136+
body='me=letop&you=leworst'
137+
)
138+
139+
expected_payload = '\n'.join([
140+
'GET',
141+
'https://api.twilio.com/',
142+
'q1=v1',
143+
'headera:vala',
144+
'headerb:valb',
145+
'',
146+
'headera;headerb',
147+
'{}'.format(ClientValidationJwt._hash('me=letop&you=leworst'))
148+
])
149+
expected_payload = ClientValidationJwt._hash(expected_payload)
150+
151+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', 'secret', vp)
152+
153+
actual_payload = jwt._generate_payload()
154+
self.assertEqual('headera;headerb', actual_payload['hrh'])
155+
self.assertEqual(expected_payload, actual_payload['rqh'])
156+
157+
def test_generate_payload_no_headers(self):
158+
vp = ValidationPayload(
159+
method='GET',
160+
path='https://api.twilio.com/',
161+
query_string='q1=v1',
162+
signed_headers=['headerb', 'headera'],
163+
all_headers={},
164+
body='me=letop&you=leworst'
165+
)
166+
167+
expected_payload = '\n'.join([
168+
'GET',
169+
'https://api.twilio.com/',
170+
'q1=v1',
171+
'',
172+
'headera;headerb',
173+
'{}'.format(ClientValidationJwt._hash('me=letop&you=leworst'))
174+
])
175+
expected_payload = ClientValidationJwt._hash(expected_payload)
176+
177+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', 'secret', vp)
178+
179+
actual_payload = jwt._generate_payload()
180+
self.assertEqual('headera;headerb', actual_payload['hrh'])
181+
self.assertEqual(expected_payload, actual_payload['rqh'])
182+
183+
def test_generate_payload_schema_correct_1(self):
184+
"""Test against a known good rqh payload hash"""
185+
vp = ValidationPayload(
186+
method='GET',
187+
path='/Messages',
188+
query_string='PageSize=5&Limit=10',
189+
signed_headers=['authorization', 'host'],
190+
all_headers={'authorization': 'foobar', 'host': 'api.twilio.com'},
191+
body='foobar'
192+
)
193+
194+
expected_hash = '4dc9b67bed579647914587b0e22a1c65c1641d8674797cd82de65e766cce5f80'
195+
196+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', 'secret', vp)
197+
198+
actual_payload = jwt._generate_payload()
199+
self.assertEqual('authorization;host', actual_payload['hrh'])
200+
self.assertEqual(expected_hash, actual_payload['rqh'])
201+
202+
def test_generate_payload_schema_correct_2(self):
203+
"""Test against a known good rqh payload hash"""
204+
vp = ValidationPayload(
205+
method='POST',
206+
path='/Messages',
207+
query_string='',
208+
signed_headers=['authorization', 'host'],
209+
all_headers={'authorization': 'foobar', 'host': 'api.twilio.com'},
210+
body='testbody'
211+
)
212+
213+
expected_hash = 'bd792c967c20d546c738b94068f5f72758a10d26c12979677501e1eefe58c65a'
214+
215+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', 'secret', vp)
216+
217+
actual_payload = jwt._generate_payload()
218+
self.assertEqual('authorization;host', actual_payload['hrh'])
219+
self.assertEqual(expected_hash, actual_payload['rqh'])
220+
221+
def test_jwt_payload(self):
222+
vp = ValidationPayload(
223+
method='GET',
224+
path='/Messages',
225+
query_string='PageSize=5&Limit=10',
226+
signed_headers=['authorization', 'host'],
227+
all_headers={'authorization': 'foobar', 'host': 'api.twilio.com'},
228+
body='foobar'
229+
)
230+
expected_hash = '4dc9b67bed579647914587b0e22a1c65c1641d8674797cd82de65e766cce5f80'
231+
232+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', 'secret', vp)
233+
234+
self.assertDictContainsSubset({
235+
'hrh': 'authorization;host',
236+
'rqh': expected_hash,
237+
'iss': 'SK123',
238+
'sub': 'AC123',
239+
}, jwt.payload)
240+
self.assertGreaterEqual(jwt.payload['exp'], time.time(), 'JWT exp is before now')
241+
self.assertLessEqual(jwt.payload['exp'], time.time() + 301, 'JWT exp is after now + 5mins')
242+
self.assertDictEqual({
243+
'alg': 'RS256',
244+
'typ': 'JWT',
245+
'cty': 'twilio-pkrv;v=1',
246+
'kid': 'CR123'
247+
}, jwt.headers)
248+
249+
def test_jwt_signing(self):
250+
vp = ValidationPayload(
251+
method='GET',
252+
path='/Messages',
253+
query_string='PageSize=5&Limit=10',
254+
signed_headers=['authorization', 'host'],
255+
all_headers={'authorization': 'foobar', 'host': 'api.twilio.com'},
256+
body='foobar'
257+
)
258+
expected_hash = '4dc9b67bed579647914587b0e22a1c65c1641d8674797cd82de65e766cce5f80'
259+
260+
private_key = rsa.generate_private_key(
261+
public_exponent=65537,
262+
key_size=2048,
263+
backend=default_backend()
264+
)
265+
public_key = private_key.public_key().public_bytes(Encoding.PEM, PublicFormat.PKCS1)
266+
private_key = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
267+
268+
jwt = ClientValidationJwt('AC123', 'SK123', 'CR123', private_key, vp)
269+
decoded = Jwt.from_jwt(jwt.to_jwt(), public_key)
270+
271+
self.assertDictContainsSubset({
272+
'hrh': 'authorization;host',
273+
'rqh': expected_hash,
274+
'iss': 'SK123',
275+
'sub': 'AC123',
276+
}, decoded.payload)
277+
self.assertGreaterEqual(decoded.payload['exp'], time.time(), 'JWT exp is before now')
278+
self.assertLessEqual(decoded.payload['exp'], time.time() + 501, 'JWT exp is after now + 5m')
279+
self.assertDictEqual({
280+
'alg': 'RS256',
281+
'typ': 'JWT',
282+
'cty': 'twilio-pkrv;v=1',
283+
'kid': 'CR123'
284+
}, decoded.headers)
285+
286+

0 commit comments

Comments
 (0)
0