-
Notifications
You must be signed in to change notification settings - Fork 766
feat: Organizations Api uptake for twilio-python #815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b4c5734
3e246e4
8395487
bc5c16b
a66f9e9
b5a6490
fac26ee
15e15c0
7b07ba7
af11fd2
661785d
6a8c2d8
1ba2f9b
98708f0
d78d5d5
7bdf1b5
bc77770
0211f23
27dec32
2959689
b973065
ceebd46
35b5015
76fecab
e9eaa72
1c8420c
644f94b
66f3e28
88f6623
630c28c
26ee4d3
ddb336b
d79366e
824ed9f
3670f03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from twilio.auth_strategy.auth_type import AuthType | ||
from abc import abstractmethod | ||
|
||
|
||
class AuthStrategy(object): | ||
def __init__(self, auth_type: AuthType): | ||
self._auth_type = auth_type | ||
|
||
@property | ||
def auth_type(self) -> AuthType: | ||
return self._auth_type | ||
|
||
@abstractmethod | ||
def get_auth_string(self) -> str: | ||
"""Return the authentication string.""" | ||
|
||
@abstractmethod | ||
def requires_authentication(self) -> bool: | ||
"""Return True if authentication is required, else False.""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from enum import Enum | ||
|
||
|
||
class AuthType(Enum): | ||
ORGS_TOKEN = "orgs_stoken" | ||
NO_AUTH = "noauth" | ||
BASIC = "basic" | ||
API_KEY = "api_key" | ||
CLIENT_CREDENTIALS = "client_credentials" | ||
|
||
def __str__(self): | ||
return self.value |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from auth_type import AuthType | ||
from twilio.auth_strategy.auth_strategy import AuthStrategy | ||
|
||
|
||
class NoAuthStrategy(AuthStrategy): | ||
def __init__(self): | ||
super().__init__(AuthType.NO_AUTH) | ||
|
||
def get_auth_string(self) -> str: | ||
return "" | ||
|
||
def requires_authentication(self) -> bool: | ||
return False |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import jwt | ||
import threading | ||
import logging | ||
from datetime import datetime | ||
|
||
from twilio.auth_strategy.auth_type import AuthType | ||
from twilio.auth_strategy.auth_strategy import AuthStrategy | ||
from twilio.http.token_manager import TokenManager | ||
|
||
|
||
class TokenAuthStrategy(AuthStrategy): | ||
def __init__(self, token_manager: TokenManager): | ||
super().__init__(AuthType.ORGS_TOKEN) | ||
self.token_manager = token_manager | ||
self.token = None | ||
self.lock = threading.Lock() | ||
logging.basicConfig(level=logging.INFO) | ||
self.logger = logging.getLogger(__name__) | ||
|
||
def get_auth_string(self) -> str: | ||
self.fetch_token() | ||
return f"Bearer {self.token}" | ||
|
||
def requires_authentication(self) -> bool: | ||
return True | ||
|
||
def fetch_token(self): | ||
if self.token is None or self.token == "" or self.is_token_expired(self.token): | ||
with self.lock: | ||
if ( | ||
self.token is None | ||
or self.token == "" | ||
or self.is_token_expired(self.token) | ||
): | ||
self.logger.info("New token fetched for accessing organization API") | ||
self.token = self.token_manager.fetch_access_token() | ||
|
||
def is_token_expired(self, token): | ||
try: | ||
decoded = jwt.decode(token, options={"verify_signature": False}) | ||
exp = decoded.get("exp") | ||
|
||
if exp is None: | ||
return True # No expiration time present, consider it expired | ||
|
||
|
||
# Check if the expiration time has passed | ||
return datetime.fromtimestamp(exp) < datetime.utcnow() | ||
|
||
except jwt.DecodeError: | ||
return True # Token is invalid | ||
except Exception as e: | ||
print(f"An error occurred: {e}") | ||
return True |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,10 @@ | |
from urllib.parse import urlparse, urlunparse | ||
|
||
from twilio import __version__ | ||
from twilio.base.exceptions import TwilioException | ||
from twilio.http import HttpClient | ||
from twilio.http.http_client import TwilioHttpClient | ||
from twilio.http.response import Response | ||
from twilio.credential.cre 3262 dential_provider import CredentialProvider | ||
|
||
|
||
class ClientBase(object): | ||
|
@@ -23,6 +23,7 @@ def __init__( | |
environment: Optional[MutableMapping[str, str]] = None, | ||
edge: Optional[str] = None, | ||
user_agent_extensions: Optional[List[str]] = None, | ||
credential_provider: Optional[CredentialProvider] = None, | ||
): | ||
""" | ||
Initializes the Twilio Client | ||
|
@@ -35,7 +36,9 @@ def __init__( | |
:param environment: Environment to look for auth details, defaults to os.environ | ||
:param edge: Twilio Edge to make requests to, defaults to None | ||
:param user_agent_extensions: Additions to the user agent string | ||
:param credential_provider: credential provider for authentication method that needs to be used | ||
""" | ||
|
||
environment = environment or os.environ | ||
|
||
self.username = username or environment.get("TWILIO_ACCOUNT_SID") | ||
|
@@ -48,9 +51,8 @@ def __init__( | |
""" :type : str """ | ||
self.user_agent_extensions = user_agent_extensions or [] | ||
""" :type : list[str] """ | ||
|
||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not self.username or not self.password: | ||
raise TwilioException("Credentials are required to create a TwilioClient") | ||
self.credential_provider = credential_provider or None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check this with Kridai, if existing customers use TwilioException - this is a breaking change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check if the exception being thrown as 401 is getting wrapped in TwilioException and being sent to customer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the below cases
Is this a breaking change? |
||
""" :type : CredentialProvider """ | ||
|
||
self.account_sid = account_sid or self.username | ||
""" :type : str """ | ||
|
@@ -85,15 +87,27 @@ def request( | |
|
||
:returns: Response from the Twilio API | ||
""" | ||
auth = self.get_auth(auth) | ||
headers = self.get_headers(method, headers) | ||
uri = self.get_hostname(uri) | ||
|
||
if self.credential_provider: | ||
|
||
auth_strategy = self.credential_provider.to_auth_strategy() | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
headers["Authorization"] = auth_strategy.get_auth_string() | ||
elif self.username is not None and self.password is not None: | ||
auth = self.get_auth(auth) | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
auth = None | ||
|
||
if method == "DELETE": | ||
del headers["Accept"] | ||
|
||
uri = self.get_hostname(uri) | ||
filtered_data = self.copy_non_none_values(data) | ||
return self.http_client.request( | ||
method, | ||
uri, | ||
params=params, | ||
data=data, | ||
data=filtered_data, | ||
headers=headers, | ||
auth=auth, | ||
timeout=timeout, | ||
|
@@ -132,21 +146,44 @@ async def request_async( | |
"http_client must be asynchronous to support async API requests" | ||
) | ||
|
||
auth = self.get_auth(auth) | ||
headers = self.get_headers(method, headers) | ||
uri = self.get_hostname(uri) | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if method == "DELETE": | ||
del headers["Accept"] | ||
|
||
if self.credential_provider: | ||
auth_strategy = self.credential_provider.to_auth_strategy() | ||
headers["Authorization"] = auth_strategy.get_auth_string() | ||
elif self.username is not None and self.password is not None: | ||
auth = self.get_auth(auth) | ||
else: | ||
auth = None | ||
|
||
uri = self.get_hostname(uri) | ||
filtered_data = self.copy_non_none_values(data) | ||
return await self.http_client.request( | ||
method, | ||
uri, | ||
params=params, | ||
data=data, | ||
data=filtered_data, | ||
headers=headers, | ||
auth=auth, | ||
timeout=timeout, | ||
allow_redirects=allow_redirects, | ||
) | ||
|
||
def copy_non_none_values(self, data): | ||
if isinstance(data, dict): | ||
return { | ||
k: self.copy_non_none_values(v) | ||
for k, v in data.items() | ||
if v is not None | ||
} | ||
elif isinstance(data, list): | ||
return [ | ||
self.copy_non_none_values(item) for item in data if item is not None | ||
] | ||
return data | ||
|
||
def get_auth(self, auth: Optional[Tuple[str, str]]) -> Tuple[str, str]: | ||
""" | ||
Get the request authentication object | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from twilio.auth_strategy.auth_type import AuthType | ||
|
||
|
||
class CredentialProvider: | ||
def __init__(self, auth_type: AuthType): | ||
self._auth_type = auth_type | ||
|
||
@property | ||
def auth_type(self) -> AuthType: | ||
return self._auth_type | ||
|
||
def to_auth_strategy(self): | ||
raise NotImplementedError("Subclasses must implement this method") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from twilio.http.orgs_token_manager import OrgTokenManager | ||
from twilio.base.exceptions import TwilioException | ||
from twilio.credential.credential_provider import CredentialProvider | ||
from twilio.auth_strategy.auth_type import AuthType | ||
from twilio.auth_strategy.token_auth_strategy import TokenAuthStrategy | ||
|
||
|
||
class OrgsCredentialProvider(CredentialProvider): | ||
def __init__(self, client_id: str, client_secret: str, token_manager=None): | ||
super().__init__(AuthType.CLIENT_CREDENTIALS) | ||
|
||
if client_id is None or client_secret is None: | ||
raise TwilioException("Client id and Client secret are mandatory") | ||
|
||
self.grant_type = "client_credentials" | ||
self.client_id = client_id | ||
self.client_secret = client_secret | ||
self.token_manager = token_manager | ||
self.auth_strategy = None | ||
|
||
def to_auth_strategy(self): | ||
if self.token_manager is None: | ||
self.token_manager = OrgTokenManager( | ||
self.grant_type, self.client_id, self.client_secret | ||
) | ||
if self.auth_strategy is None: | ||
self.auth_strategy = TokenAuthStrategy(self.token_manager) | ||
return self.auth_strategy |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from twilio.http.token_manager import TokenManager | ||
from twilio.rest import Client | ||
|
||
|
||
class OrgTokenManager(TokenManager): | ||
""" | ||
Orgs Token Manager | ||
""" | ||
|
||
def __init__( | ||
self, | ||
grant_type: str, | ||
client_id: str, | ||
client_secret: str, | ||
code: str = None, | ||
redirect_uri: str = None, | ||
audience: str = None, | ||
refreshToken: str = None, | ||
scope: str = None, | ||
): | ||
self.grant_type = grant_type | ||
self.client_id = client_id | ||
self.client_secret = client_secret | ||
self.code = code | ||
self.redirect_uri = redirect_uri | ||
self.audience = audience | ||
self.refreshToken = refreshToken | ||
self.scope = scope | ||
self.client = Client() | ||
|
||
def fetch_access_token(self): | ||
token_instance = self.client.preview_iam.v1.token.create( | ||
grant_type=self.grant_type, | ||
client_id=self.client_id, | ||
client_secret=self.client_secret, | ||
code=self.code, | ||
redirect_uri=self.redirect_uri, | ||
audience=self.audience, | ||
scope=self.scope, | ||
) | ||
return token_instance.access_token |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from twilio.base.version import Version | ||
|
||
|
||
class TokenManager: | ||
|
||
def fetch_access_token(self, version: Version): | ||
pass |
Uh oh!
There was an error while loading. Please reload this page.