8000 feat: define useful properties on `google.auth.external_account.Crede… · zevdg/google-auth-library-python@f97499c · GitHub
[go: up one dir, main page]

Skip to content

Commit f97499c

Browse files
feat: define useful properties on google.auth.external_account.Credentials (googleapis#770)
This includes the following properties: - `info`: This is the reverse of `from_info` defined on subclasses and useful to serialize external account credentials. - `service_account_email`: This is the corresponding service account email if impersonation is used. - `is_user`: This is `False` for workload identity pools and `True` for workforce pools (not yet supported). This can be mainly determined from the STS audience. While the properties will primarily facilitate integration with gcloud, they are publicly useful for other contexts.
1 parent 458f40b commit f97499c

File tree

4 files changed

+231
-8
lines changed

4 files changed

+231
-8
lines changed

google/auth/external_account.py

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
"""
2929

3030
import abc
31+
import copy
3132
import datetime
3233
import json
34+
import re
3335

3436
import six
3537

@@ -40,6 +42,8 @@
4042
from google.oauth2 import sts
4143
from google.oauth2 import utils
4244

45+
# External account JSON type identifier.
46+
_EXTERNAL_ACCOUNT_JSON_TYPE = "external_account"
4347
# The token exchange grant_type used for exchanging credentials.
4448
_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
4549
# The token exchange requested_token_type. This is always an access_token.
@@ -117,6 +121,76 @@ def __init__(
117121
self._impersonated_credentials = None
118122
self._project_id = None
119123

124+
@property
125+
def info(self):
126+
"""Generates the dictionary representation of the current credentials.
127+
128+
Returns:
129+
Mapping: The dictionary representation of the credentials. This is the
130+
reverse of "from_info" defined on the subclasses of this class. It is
131+
useful for serializing the current credentials so it can deserialized
132+
later.
133+
"""
134+
config_info = {
135+
"type": _EXTERNAL_ACCOUNT_JSON_TYPE,
136+
"audience": self._audience,
137+
"subject_token_type": self._subject_token_type,
138+
"token_url": self._token_url,
139+
"service_account_impersonation_url": self._service_account_impersonation_url,
140+
"credential_source": copy.deepcopy(self._credential_source),
141+
"quota_project_id": self._quota_project_id,
142+
"client_id": self._client_id,
143+
"client_secret": self._client_secret,
144+
}
145+
# Remove None fields in the info dictionary.
146+
for k, v in dict(config_info).items():
147+
if v is None:
148+
del config_info[k]
149+
150+
return config_info
151+
152+
@property
153+
def service_account_email(self):
154+
"""Returns the service account email if service account impersonation is used.
155+
156+
Returns:
157+
Optional[str]: The service account email if impersonation is used. Otherwise
158+
None is returned.
159+
"""
160+
if self._service_account_impersonation_url:
161+
# Parse email from URL. The formal looks as follows:
162+
# https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/name@project-id.iam.gserviceaccount.com:generateAccessToken
163+
url = self._service_account_impersonation_url
164+
start_index = url.rfind("/")
165+
end_index = url.find(":generateAccessToken")
166+
if start_index != -1 and end_index != -1 and start_index < end_index:
167+
start_index = start_index + 1
168+
return url[start_index:end_index]
169+
return None
170+
171+
@property
172+
def is_user(self):
173+
"""Returns whether the credentials represent a user (True) or workload (False).
174+
Workloads behave similarly to service accounts. Currently workloads will use
175+
service account impersonation but will eventually not require impersonation.
176+
As a result, this property is more reliable than the service account email
177+
property in determining if the credentials represent a user or workload.
178+
179+
Returns:
180+
bool: True if the credentials represent a user. False if they represent a
181+
workload.
182+
"""
183+
# If service account impersonation is used, the credentials will always represent a
184+
# service account.
185+
if self._service_account_impersonation_url:
186+
return False
187+
# Workforce pools representing users have the following audience format:
188+
# //iam.googleapis.com/locations/$location/workforcePools/$poolId/providers/$providerId
189+
p = re.compile(r"//iam\.googleapis\.com/locations/[^/]+/workforcePools/")
190+
if p.match(self._audience):
191+
return True
192+
return False
193+
120194
@property
121195
def requires_scopes(self):
122196
"""Checks if the credentials requires scopes.
@@ -282,14 +356,8 @@ def _initialize_impersonated_credentials(self):
282356
)
283357

284358
# Determine target_principal.
285-
start_index = self._service_account_impersonation_url.rfind("/")
286-
end_index = self._service_account_impersonation_url.find(":generateAccessToken")
287-
if start_index != -1 and end_index != -1 and start_index < end_index:
288-
start_index = start_index + 1
289-
target_principal = self._service_account_impersonation_url[
290-
start_index:end_index
291-
]
292-
else:
359+
target_principal = self.service_account_email
360+
if not target_principal:
293361
raise exceptions.RefreshError(
294362
"Unable to determine target principal from service account impersonation URL."
295363
)

tests/test_aws.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,19 @@ def test_constructor_invalid_environment_id_version(self):
919919

920920
assert excinfo.match(r"aws version '3' is not supported in the current build.")
921921

922+
def test_info(self):
923+
credentials = self.make_credentials(
924+
credential_source=self.CREDENTIAL_SOURCE.copy()
925+
)
926+
927+
assert credentials.info == {
928+
"type": "external_account",
929+
"audience": AUDIENCE,
930+
"subject_token_type": SUBJECT_TOKEN_TYPE,
931+ "token_url": TOKEN_URL,
932+
"credential_source": self.CREDENTIAL_SOURCE,
933+
}
934+
922935
def test_retrieve_subject_token_missing_region_url(self):
923936
# When AWS_REGION envvar is not available, region_url is required for
924937
# determining the current AWS region.

tests/test_external_account.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@
3131
# Base64 encoding of "username:password"
3232
BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
3333
SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
34+
# List of valid workforce pool audiences.
35+
TEST_USER_AUDIENCES = [
36+
"//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
37+
"//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
38+
"//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
39+
]
3440

3541

3642
class CredentialsImpl(external_account.Credentials):
@@ -342,6 +348,116 @@ def test_with_invalid_impersonation_target_principal(self):
342348
r"Unable to determine target principal from service account impersonation URL."
343349
)
344350

351+
def test_info(self):
352+
credentials = self.make_credentials()
353+
354+
assert credentials.info == {
355+
"type": "external_account",
356+
"audience": self.AUDIENCE,
357+
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
358+
"token_url": self.TOKEN_URL,
359+
"credential_source": self.CREDENTIAL_SOURCE.copy(),
360+
}
361+
362+
def test_info_with_full_options(self):
363+
credentials = self.make_credentials(
364+
client_id=CLIENT_ID,
365+
client_secret=CLIENT_SECRET,
366+
quota_project_id=self.QUOTA_PROJECT_ID,
367+
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
368+
)
369+
370+
assert credentials.info == {
371+
"type": "external_account",
372+
"audience": self.AUDIENCE,
373+
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
374+
"token_url": self.TOKEN_URL,
375+
"service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
376+
"credential_source": self.CREDENTIAL_SOURCE.copy(),
377+
"quota_project_id": self.QUOTA_PROJECT_ID,
378+
"client_id": CLIENT_ID,
379+
"client_secret": CLIENT_SECRET,
380+
}
381+
382+
def test_service_account_email_without_impersonation(self):
383+
credentials = self.make_credentials()
384+
385+
assert credentials.service_account_email is None
386+
387+
def test_service_account_email_with_impersonation(self):
388+
credentials = self.make_credentials(
389+
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
390+
)
391+
392+
assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
393+
394+
@pytest.mark.parametrize(
395+
"audience",
396+
# Workload identity pool audiences or invalid workforce pool audiences.
397+
[
398+
# Legacy K8s audience format.
399+
"identitynamespace:1f12345:my_provider",
400+
(
401+
"//iam.googleapis.com/projects/123456/locations/"
402+
"global/workloadIdentityPools/pool-id/providers/"
403+
"provider-id"
404+
),
405+
(
406+
"//iam.googleapis.com/projects/123456/locations/"
407+
"eu/workloadIdentityPools/pool-id/providers/"
408+
"provider-id"
409+
),
410+
# Pool ID with workforcePools string.
411+
(
412+
"//iam.googleapis.com/projects/123456/locations/"
413+
"global/workloadIdentityPools/workforcePools/providers/"
414+
"provider-id"
415+
),
416+
# Unrealistic / incorrect workforce pool audiences.
417+
"//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
418+
"//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
419+
"//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
420+
"//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
421+
"//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
422+
],
423+
)
424+
def test_is_user_with_non_users(self, audience):
425+
credentials = CredentialsImpl(
426+
audience=audience,
427+
subject_token_type=self.SUBJECT_TOKEN_TYPE,
428+
token_url=self.TOKEN_URL,
429+
credential_source=self.CREDENTIAL_SOURCE,
430+
)
431+
432+
assert credentials.is_user is False
433+
434+
@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
435+
def test_is_user_with_users(self, audience):
436+
credentials = CredentialsImpl(
437+
audience=audience,
438+
subject_token_type=self.SUBJECT_TOKEN_TYPE,
439+
token_url=self.TOKEN_URL,
440+
credential_source=self.CREDENTIAL_SOURCE,
441+
)
442+
443+
assert credentials.is_user is True
444+
445+
@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
446+
def test_is_user_with_users_and_impersonation(self, audience):
447+
# Initialize the credentials with service account impersonation.
448+
credentials = CredentialsImpl(
449+
audience=audience,
450+
subject_token_type=self.SUBJECT_TOKEN_TYPE,
451+
token_url=self.TOKEN_URL,
452+
credential_source=self.CREDENTIAL_SOURCE,
453+
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
454+
)
455+
456+
# Even though the audience is for a workforce pool, since service account
457+
# impersonation is used, the credentials will represent a service account and
458+
# not a user.
459+
assert credentials.is_user is False
460+
345461
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
346462
def test_refresh_without_client_auth_success(self, unused_utcnow):
347463
response = self.SUCCESS_RESPONSE.copy()

tests/test_identity_pool.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,32 @@ def test_constructor_missing_subject_token_field_name(self):
430430
r"Missing subject_token_field_name for JSON credential_source format"
431431
)
432432

433+
def test_info_with_file_credential_source(self):
434+
credentials = self.make_credentials(
435+
credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
436+
)
437+
438+
assert credentials.info == {
439+
"type": "external_account",
440+
"audience": AUDIENCE,
441+
"subject_token_type": SUBJECT_TOKEN_TYPE,
442+
"token_url": TOKEN_URL,
443+
"credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
444+
}
445+
446+
def test_info_with_url_credential_source(self):
447+
credentials = self.make_credentials(
448+
credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
449+
)
450+
451+
assert credentials.info == {
452+
"type": "external_account",
453+
"audience": AUDIENCE,
454+
"subject_token_type": SUBJECT_TOKEN_TYPE,
455+
"token_url": TOKEN_URL,
456+
"credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
457+
}
458+
433459
def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
434460
# Provide empty text file.
435461
empty_file = tmpdir.join("empty.txt")

0 commit comments

Comments
 (0)
0