8000 Add access tokens to master · dozure/twilio-python@e0c29fa · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e0c29fa

Browse files
committed
Add access tokens to master
1 parent 27f9df2 commit e0c29fa

File tree

4 files changed

+165
-2
lines changed

4 files changed

+165
-2
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#
1414
# You need to have the setuptools module installed. Try reading the setuptools
1515
# documentation: http://pypi.python.org/pypi/setuptools
16-
REQUIRES = ["httplib2 >= 0.7", "six", "pytz"]
16+
REQUIRES = ["httplib2 >= 0.7", "six", "pytz", "pyjwt"]
1717

1818
if sys.version_info < (2, 6):
1919
REQUIRES.append('simplejson')

tests/test_access_token.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import unittest
2+
3+
from nose.tools import assert_equal
4+
from twilio.jwt import decode
5+
from twilio.access_token import AccessToken, ConversationsGrant, IpMessagingGrant
6+
7+
ACCOUNT_SID = 'AC123'
8+
SIGNING_KEY_SID = 'SK123'
9+
10+
11+
# python2.6 support
12+
def assert_is_not_none(obj):
13+
assert obj is not None, '%r is None' % obj
14+
15+
16+
class AccessTokenTest(unittest.TestCase):
17+
def _validate_claims(self, payload):
18+
assert_equal(SIGNING_KEY_SID, payload['iss'])
19+
assert_equal(ACCOUNT_SID, payload['sub'])
20+
assert_is_not_none(payload['nbf'])
21+
assert_is_not_none(payload['exp'])
22+
assert_equal(payload['nbf'] + 3600, payload['exp'])
23+
assert_is_not_none(payload['jti'])
24+
assert_equal('{0}-{1}'.format(payload['iss'], payload['nbf']),
25+
payload['jti'])
26+
assert_is_not_none(payload['grants'])
27+
28+
def test_empty_grants(self):
29+
scat = AccessToken(ACCOUNT_SID, SIGNING_KEY_SID, 'secret')
30+
token = str(scat)
31+
32+
assert_is_not_none(token)
33+
payload = decode(token, 'secret')
34+
self._validate_claims(payload)
35+
assert_equal({}, payload['grants'])
36+
37+
def test_conversations_grant(self):
38+
scat = AccessToken(ACCOUNT_SID, SIGNING_KEY_SID, 'secret')
39+
scat.add_grant(ConversationsGrant())
40+
41+
token = str(scat)
42+
assert_is_not_none(token)
43+
payload = decode(token, 'secret')
44+
self._validate_claims(payload)
45+
assert_equal(1, len(payload['grants']))
46+
assert_equal({}, payload['grants']['rtc'])
47+
48+
def test_ip_messaging_grant(self):
49+
scat = AccessToken(ACCOUNT_SID, SIGNING_KEY_SID, 'secret')
50+
scat.add_grant(IpMessagingGrant())
51+
52+
token = str(scat)
53+
assert_is_not_none(token)
54+
payload = decode(token, 'secret')
55+
self._validate_claims(payload)
56+
assert_equal(1, len(payload['grants']))
57+
assert_equal({}, payload['grants']['ip_messaging'])
58+
59+
def test_grants(self):
60+
scat = AccessToken(ACCOUNT_SID, SIGNING_KEY_SID, 'secret')
61+
scat.add_grant(ConversationsGrant())
62+
scat.add_grant(IpMessagingGrant())
63+
64+
token = str(scat)
65+
assert_is_not_none(token)
66+
payload = decode(token, 'secret')
67+
self._validate_claims(payload)
68+
assert_equal(2, len(payload['grants']))
69+
assert_equal({}, payload['grants']['rtc'])
70+
assert_equal({}, payload['grants']['ip_messaging'])

twilio/access_token.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import time
2+
import jwt
3+
4+
5+
class IpMessagingGrant(object):
6+
""" Grant to access Twilio IP Messaging """
7+
def __init__(self, service_sid=None, endpoint_id=None,
8+
role_sid=None, credential_sid=None):
9+
self.service_sid = service_sid
10+
self.endpoint_id = endpoint_id
11+
self.deployment_role_sid = role_sid
12+
self.push_credential_sid = credential_sid
13+
14+
@property
15+
def key(self):
16+
return "ip_messaging"
17+
18+
def to_payload(self):
19+
grant = {}
20+
if self.service_sid:
21+
grant['service_sid'] = self.service_sid
22+
if self.endpoint_id:
23+
grant['endpoint_id'] = self.endpoint_id
24+
if self.deployment_role_sid:
25+
grant['deployment_role_sid'] = self.deployment_role_sid
26+
if self.push_credential_sid:
27+
grant['push_credential_sid'] = self.push_credential_sid
28+
29+
return grant
30+
31+
32+
class ConversationsGrant(object):
33+
""" Grant to access Twilio Conversations """
34+
def __init__(self, configuration_profile_sid=None):
35+
self.configuration_profile_sid = configuration_profile_sid
36+
37+
@property
38+
def key(self):
39+
return "rtc"
40+
41+
def to_payload(self):
42+
grant = {}
43+
if self.configuration_profile_sid:
44+
grant['configuration_profile_sid'] = self.configuration_profile_sid
45+
46+
return grant
47+
48+
49+
class AccessToken(object):
50+
""" Access Token used to access Twilio Resources """
51+
def __init__(self, account_sid, signing_key_sid, secret,
52+
identity=None, ttl=3600):
53+
self.account_sid = account_sid
54+
self.signing_key_sid = signing_key_sid
55+
self.secret = secret
56+
57+
self.identity = identity
58+
self.ttl = ttl
59+
self.grants = []
60+
61+
def add_grant(self, grant):
62+
self.grants.append(grant)
63+
64+
def to_jwt(self, algorithm='HS256'):
65+
now = int(time.time())
66+
headers = {
67+
"typ": "JWT",
68+
"cty": "twilio-fpa;v=1"
69+
}
70+
71+
grants = {}
72+
if self.identity:
73+
grants["identity"] = self.identity
74+
75+
for grant in self.grants:
76+
grants[grant.key] = grant.to_payload()
77+
78+
payload = {
79+
"jti": '{0}-{1}'.format(self.signing_key_sid, now),
80+
"iss": self.signing_key_sid,
81+
"sub": self.account_sid,
82+
"nbf": now,
83+
"exp": now + self.ttl,
84+
"grants": grants
85+
}
86+
87+
return jwt.encode(payload, self.secret, headers=headers,
88+
algorithm=algorithm)
89+
90+
def __str__(self):
91+
return self.to_jwt().decode('utf-8')

twilio/jwt/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ def base64url_encode(input):
4141
return base64.urlsafe_b64encode(input).decode('utf-8').replace('=', '')
4242

4343

44-
def encode(payload, key, algorithm='HS256'):
44+
def encode(payload, key, algorithm='HS256', headers=None):
4545
segments = []
4646
header = {"typ": "JWT", "alg": algorithm}
47+
if headers:
48+
header.update(headers)
4749
segments.append(base64url_encode(binary(json.dumps(header))))
4850
segments.append(base64url_encode(binary(json.dumps(payload))))
4951
sign_input = '.'.join(segments)

0 commit comments

Comments
 (0)
0