-
Notifications
You must be signed in to change notification settings - Fork 775
feat: Organizations Api uptake for twilio-python #815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
b4c5734
3e246e4
8395487
bc5c16b
a66f9e9
b5a6490
fac26ee
15e15c0
7b07ba7
af11fd2
661785d
6a8c2d8
1ba2f9b
98708f0
d78d5d5
7bdf1b5
bc77770
0211f23
27dec32
2959689
b973065
ceebd46
35b5015
76fecab
e9eaa72
1c8420c
644f94b
66f3e28
88f6623
630c28c
26ee4d3
ddb336b
d79366e
824ed9f
3670f03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from twilio.authStrategy.authType import AuthType | ||
from enum import Enum | ||
from abc import abstractmethod | ||
|
||
|
||
class AuthStrategy(object): | ||
def __init__(self, auth_type: AuthType): | ||
self._auth_type = auth_type | ||
|
||
@property | ||
def auth_type(self) -> AuthType: | ||
return self._auth_type | ||
|
||
def get_auth_string(self) -> str: | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Return the authentication string.""" | ||
pass | ||
|
||
@abstractmethod | ||
def requires_authentication(self) -> bool: | ||
"""Return True if authentication is required, else False.""" | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from enum import Enum | ||
|
||
class AuthType(Enum): | ||
TOKEN = 'token' | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
NO_AUTH = 'noauth' | ||
BASIC = 'basic' | ||
API_KEY = 'api_key' | ||
CLIENT_CREDENTIALS = 'client_credentials' | ||
|
||
def __str__(self): | ||
return self.value |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import base64 | ||
from enum import Enum | ||
|
||
|
||
class BasicAuthStrategy(AuthStrategy): | ||
def __init__(self, username: str, password: str): | ||
super().__init__(AuthType.BASIC) | ||
self.username = username | ||
self.password = password | ||
|
||
def get_auth_string(self) -> str: | ||
credentials = f"{self.username}:{self.password}" | ||
encoded = base64.b64encode(credentials.encode('ascii')).decode('ascii') | ||
return f"Basic {encoded}" | ||
|
||
def requires_authentication(self) -> bool: | ||
return True |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from auth_type import AuthType | ||
|
||
class NoAuthStrategy(AuthStrategy): | ||
def __init__(self): | ||
super().__init__(AuthType.NO_AUTH) | ||
|
||
def get_auth_string(self) -> str: | ||
return "" | ||
|
||
def requires_authentication(self) -> bool: | ||
return False |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import jwt | ||
import threading | ||
from datetime import datetime, timedelta | ||
|
||
from twilio.authStrategy.authType import AuthType | ||
from twilio.authStrategy.authStrategy import AuthStrategy | ||
from twilio.http.token_manager import TokenManager | ||
|
||
|
||
class TokenAuthStrategy(AuthStrategy): | ||
def __init__(self, token_manager: TokenManager): | ||
super().__init__(AuthType.TOKEN) | ||
self.token_manager = token_manager | ||
self.token = None | ||
self.lock = threading.Lock() | ||
|
||
def get_auth_string(self) -> str: | ||
return f"Bearer {self.token}" | ||
|
||
def requires_authentication(self) -> bool: | ||
return True | ||
|
||
def fetch_token(self): | ||
if self.token is None or self.token == "" or self.is_token_expired(self.token): | ||
with self.lock: | ||
if self.token is None or self.token == "" or self.is_token_expired(self.token): | ||
self.token = self.token_manager.fetch_access_token() | ||
|
||
def is_token_expired(self, token): | ||
decoded_jwt = jwt.decode(token, options={"verify_signature": True}) | ||
expires_at = decoded_jwt.get("exp") | ||
# Add a buffer of 30 seconds | ||
buffer_seconds = 30 | ||
buffer_expires_at = expires_at - buffer_seconds | ||
return buffer_expires_at < datetime.datetime.now().timestamp() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,11 @@ | |
from urllib.parse import urlparse, urlunparse | ||
|
||
from twilio import __version__ | ||
from twilio.base.exceptions import TwilioException | ||
from twilio.http import HttpClient | ||
from twilio.http.http_client import TwilioHttpClient | ||
from twilio.http.response import Response | ||
from twilio.authStrategy.authType import AuthType | ||
from twilio.credential.credentialProvider import CredentialProvider | ||
|
||
|
||
class ClientBase(object): | ||
|
@@ -23,6 +24,7 @@ def __init__( | |
environment: Optional[MutableMapping[str, str]] = None, | ||
edge: Optional[str] = None, | ||
user_agent_extensions: Optional[List[str]] = None, | ||
credential_provider: Optional[CredentialProvider] = None, | ||
): | ||
""" | ||
Initializes the Twilio Client | ||
|
@@ -35,7 +37,9 @@ def __init__( | |
:param environment: Environment to look for auth details, defaults to os.environ | ||
:param edge: Twilio Edge to make requests to, defaults to None | ||
:param user_agent_extensions: Additions to the user agent string | ||
:param credential_provider: credential provider for authentication method that needs to be used | ||
""" | ||
|
||
environment = environment or os.environ | ||
|
||
self.username = username or environment.get("TWILIO_ACCOUNT_SID") | ||
|
@@ -48,9 +52,8 @@ def __init__( | |
""" :type : str """ | ||
self.user_agent_extensions = user_agent_extensions or [] | ||
""" :type : list[str] """ | ||
|
||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not self.username or not self.password: | ||
raise TwilioException("Credentials are required to create a TwilioClient") | ||
self.credential_provider = credential_provider or None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check this with Kridai, if existing customers use TwilioException - this is a breaking change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check if the exception being thrown as 401 is getting wrapped in TwilioException and being sent to customer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the below cases
Is this a breaking change? |
||
""" :type : CredentialProvider """ | ||
|
||
self.account_sid = account_sid or self.username | ||
""" :type : str """ | ||
|
@@ -69,8 +72,6 @@ def request( | |
auth: Optional[Tuple[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
allow_redirects: bool = False, | ||
is_oauth: bool = False, | ||
domain: Optional[str] = None, | ||
) -> Response: | ||
""" | ||
Makes a request to the Twilio API using the configured http client | ||
|
@@ -88,20 +89,26 @@ def request( | |
:returns: Response from the Twilio API | ||
""" | ||
|
||
print('*****') | ||
if not is_oauth: | ||
auth = self.get_auth(auth) | ||
headers = self.get_headers(method, headers) | ||
|
||
##If credential provider is provided by user, get the associated auth strategy | ||
##Using the auth strategy, fetch the auth string and set it to authorization header | ||
auth_strategy = None ##Initialization | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if self.credential_provider: | ||
print(f'Reached here 2 {self.credential_provider}') | ||
auth_strategy = self.credential_provider.to_auth_strategy() | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if auth_strategy.auth_type == AuthType.TOKEN: | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
auth_strategy.fetch_token() | ||
headers["Authorization"] = auth_strategy.get_auth_string() | ||
if auth_strategy.auth_type == AuthType.BASIC: | ||
headers["Authorization"] = auth_strategy.get_auth_string() | ||
else: | ||
auth = self.get_auth(auth) | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
print(f'auth2 *** {auth}') | ||
|
||
|
||
uri = self.get_hostname(uri) | ||
if is_oauth: | ||
OauthTokenBase = dynamic_import( | ||
"twilio.base.oauth_token_base", "OauthTokenBase" | ||
) | ||
token = OauthTokenBase().get_oauth_token( | ||
domain, "v1", self.username, self.password | ||
) | ||
headers["A 10000 uthorization"] = f"Bearer {token}" | ||
headers.get("Authorization") | ||
|
||
return self.http_client.request( | ||
method, | ||
|
@@ -124,7 +131,6 @@ async def request_async( | |
auth: Optional[Tuple[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
allow_redirects: bool = False, | ||
is_oauth: bool = False, | ||
) -> Response: | ||
""" | ||
Asynchronously makes a request to the Twilio API using the configured http client | ||
|
@@ -146,19 +152,25 @@ async def request_async( | |
raise RuntimeError( | ||
"http_client must be asynchronous to support async API requests" | ||
) | ||
if not is_oauth: | ||
auth = self.get_auth(auth) | ||
|
||
|
||
headers = self.get_headers(method, headers) | ||
uri = self.get_hostname(uri) | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if is_oauth: | ||
OauthTokenBase = dynamic_import( | ||
"twilio.base.oauth_token_base", "OauthTokenBase" | ||
) | ||
token = OauthTokenBase().get_oauth_token( | ||
domain, "v1", self.username, self.password | ||
) | ||
headers["Authorization"] = f"Bearer {token}" | ||
headers.get("Authorization") | ||
|
||
##If credential provider is provided by user, get the associated auth strategy | ||
##Using the auth strategy, fetch the auth string and set it to authorization header | ||
auth_strategy = None ##Initialization | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if self.credential_provider: | ||
print(f'Reached here 1') | ||
auth_strategy = self.credential_provider.to_auth_strategy() | ||
if auth_strategy.auth_type == AuthType.TOKEN: | ||
auth_strategy.fetch_token() | ||
AsabuHere marked this conversation as resolved.
Show resolved
Hide resolved
|
||
headers["Authorization"] = auth_strategy.get_auth_string() | ||
if auth_strategy.auth_type == AuthType.BASIC: | ||
headers["Authorization"] = auth_strategy.get_auth_string() | ||
else: | ||
auth = self.get_auth(auth) | ||
|
||
print(f'auth2 *** {auth}') | ||
|
||
return await self.http_client.request( | ||
method, | ||
|
Uh oh!
There was an error while loading. Please reload this page.