8000 Identity tokens by jingming · Pull Request #240 · twilio/twilio-python · GitHub
[go: up one dir, main page]

Skip to content

Identity tokens #240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8000
Merged
merged 3 commits into from
Dec 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ twilio-python Changelog

Here you can see the full list of changes between each twilio-python release.

Version 4.10.0
-------------

Released December 3, 2015:

- Add Access Tokens

Version 4.9.2
-------------

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#
# You need to have the setuptools module installed. Try reading the setuptools
# documentation: http://pypi.python.org/pypi/setuptools
REQUIRES = ["httplib2 >= 0.7", "six", "pytz"]
REQUIRES = ["httplib2 >= 0.7", "six", "pytz", "pyjwt"]

if sys.version_info < (2, 6):
REQUIRES.append('simplejson')
Expand Down
70 changes: 70 additions & 0 deletions tests/test_access_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import unittest

from nose.tools import assert_equal
from twilio.jwt import decode
from twilio.access_token import AccessToken, ConversationsGrant, IpMessagingGrant

ACCOUNT_SID = 'AC123'
SIGNING_KEY_SID = 'SK123'


# python2.6 support
def assert_is_not_none(obj):
assert obj is not None, '%r is None' % obj


class AccessTokenTest(unittest.TestCase):
def _validate_claims(self, payload):
assert_equal(SIGNING_KEY_SID, payload['iss'])
assert_equal(ACCOUNT_SID, payload['sub'])
assert_is_not_none(payload['nbf'])
assert_is_not_none(payload['exp'])
assert_equal(payload['nbf'] + 3600, payload['exp'])
assert_is_not_none(payload['jti'])
assert_equal('{0}-{1}'.format(payload['iss'], payload['nbf']),
payload['jti'])
assert_is_not_none(payload['grants'])

def test_empty_grants(self):
scat = AccessToken(ACCOUNT_SID, SIGNING_KEY_SID, 'secret')
token = str(scat)

assert_is_not_none(token)
payload = decode(token, 'secret')
self._validate_claims(payload)
assert_equal({}, payload['grants'])

def test_conversations_grant(self):
scat = AccessToken(ACCOUNT_SID, SIGNING_KEY_SID, 'secret')
scat.add_grant(ConversationsGrant())

token = str(scat)
assert_is_not_none(token)
payload = decode(token, 'secret')
self._validate_claims(payload)
assert_equal(1, len(payload['grants']))
assert_equal({}, payload['grants']['rtc'])

def test_ip_messaging_grant(self):
scat = AccessToken(ACCOUNT_SID, SIGNING_KEY_SID, 'secret')
scat.add_grant(IpMessagingGrant())

token = str(scat)
assert_is_not_none(token)
payload = decode(token, 'secret')
self._validate_claims(payload)
assert_equal(1, len(payload['grants']))
assert_equal({}, payload['grants']['ip_messaging'])

def test_grants(self):
scat = AccessToken(ACCOUNT_SID, SIGNING_KEY_SID, 'secret')
scat.add_grant(ConversationsGrant())
scat.add_grant(IpMessagingGrant())

token = str(scat)
assert_is_not_none(token)
payload = decode(token, 'secret')
self._validate_claims(payload)
assert_equal(2, len(payload['grants']))
assert_equal({}, payload['grants']['rtc'])
assert_equal({}, payload['grants']['ip_messaging'])
91 changes: 91 additions & 0 deletions twilio/access_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import time
import jwt


class IpMessagingGrant(object):
""" Grant to access Twilio IP Messaging """
def __init__(self, service_sid=None, endpoint_id=None,
role_sid=None, credential_sid=None):
self.service_sid = service_sid
self.endpoint_id = endpoint_id
self.deployment_role_sid = role_sid
self.push_credential_sid = credential_sid

@property
def key(self):
return "ip_messaging"

def to_payload(self):
grant = {}
if self.service_sid:
grant['service_sid'] = self.service_sid
if self.endpoint_id:
grant['endpoint_id'] = self.endpoint_id
if self.deployment_role_sid:
grant['deployment_role_sid'] = self.deployment_role_sid
if self.push_credential_sid:
grant['push_credential_sid'] = self.push_credential_sid

return grant


class ConversationsGrant(object):
""" Grant to access Twilio Conversations """
def __init__(self, configuration_profile_sid=None):
self.configuration_profile_sid = configuration_profile_sid

@property
def key(self):
return "rtc"

def to_payload(self):
grant = {}
if self.configuration_profile_sid:
grant['configuration_profile_sid'] = self.configuration_profile_sid

return grant


class AccessToken(object):
""" Access Token used to access Twilio Resources """
def __init__(self, account_sid, signing_key_sid, secret,
identity=None, ttl=3600):
self.account_sid = account_sid
self.signing_key_sid = signing_key_sid
self.secret = secret

self.identity = identity
self.ttl = ttl
self.grants = []

def add_grant(self, grant):
self.grants.append(grant)

def to_jwt(self, algorithm='HS256'):
now = int(time.time())
headers = {
"typ": "JWT",
"cty": "twilio-fpa;v=1"
}

grants = {}
if self.identity:
grants["identity"] = self.identity

for grant in self.grants:
grants[grant.key] = grant.to_payload()

payload = {
"jti": '{0}-{1}'.format(self.signing_key_sid, now),
"iss": self.signing_key_sid,
"sub": self.account_sid,
"nbf": now,
"exp": now + self.ttl,
"grants": grants
}

return jwt.encode(payload, self.secret, headers=headers,
algorithm=algorithm)

def __str__(self):
return self.to_jwt().decode('utf-8')
4 changes: 3 additions & 1 deletion twilio/jwt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ def base64url_encode(input):
return base64.urlsafe_b64encode(input).decode('utf-8').replace('=', '')


def encode(payload, key, algorithm='HS256'):
def encode(payload, key, algorithm='HS256', headers=None):
segments = []
header = {"typ": "JWT", "alg": algorithm}
if headers:
header.update(headers)
segments.append(base64url_encode(binary(json.dumps(header))))
segments.append(base64url_encode(binary(json.dumps(payload))))
sign_input = '.'.join(segments)
Expand Down
2 changes: 1 addition & 1 deletion twilio/version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version_info__ = ('4', '9', '2')
__version_info__ = ('4', '10', '0')
__version__ = '.'.join(__version_info__)
0