-
Notifications
You must be signed in to change notification settings - Fork 59
sigstore: add a CT keyring, use it for SCT verification #267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bb283dd
5d2db4f
4733289
3f61268
4f0fd02
b361dc2
18fceef
d7d6b1a
7d56849
b1f8142
59722d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
# Copyright 2022 The Sigstore Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
""" | ||
Functionality for interacting with CT ("CTFE") signing keys. | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
from importlib import resources | ||
from typing import List | ||
|
||
import cryptography.hazmat.primitives.asymmetric.padding as padding | ||
from cryptography.exceptions import InvalidSignature | ||
from cryptography.hazmat.primitives import hashes | ||
from cryptography.hazmat.primitives.asymmetric import ec, rsa | ||
|
||
from sigstore._utils import PublicKey, key_id, load_pem_public_key | ||
|
||
|
||
class CTKeyringError(Exception): | ||
""" | ||
Raised on failure by `CTKeyring.verify()`. | ||
""" | 10000||
|
||
pass | ||
|
||
|
||
class CTKeyringLookupError(CTKeyringError): | ||
""" | ||
A specialization of `CTKeyringError`, indicating that the specified | ||
key ID wasn't found in the keyring. | ||
""" | ||
|
||
pass | ||
|
||
|
||
class CTKeyring: | ||
""" | ||
Represents a set of CT signing keys, each of which is a potentially | ||
valid signer for a Signed Certificate Timestamp (SCT). | ||
|
||
This structure exists to facilitate key rotation in a CT log. | ||
""" | ||
|
||
def __init__(self, keys: List[PublicKey] = []): | ||
self._keyring = {} | ||
for key in keys: | ||
self._keyring[key_id(key)] = key | ||
|
||
@classmethod | ||
def staging(cls) -> CTKeyring: | ||
""" | ||
Returns a `CTKeyring` instance capable of verifying SCTs from | ||
Sigstore's staging deployment. | ||
""" | ||
keyring = cls() | ||
keyring._add_resource("ctfe.staging.pub") | ||
keyring._add_resource("ctfe_2022.staging.pub") | ||
keyring._add_resource("ctfe_2022.2.staging.pub") | ||
|
||
return keyring | ||
|
||
@classmethod | ||
def production(cls) -> CTKeyring: | ||
""" | ||
Returns a `CTKeyring` instance capable of verifying SCTs from | ||
Sigstore's production deployment. | ||
""" | ||
keyring = cls() | ||
keyring._add_resource("ctfe.pub") | ||
keyring._add_resource("ctfe_2022.pub") | ||
|
||
return keyring | ||
|
||
def _add_resource(self, name: str) -> None: | ||
""" | ||
Adds a key to the current keyring, as identified by its | ||
resource name under `sigstore._store`. | ||
""" | ||
key_pem = resources.read_binary("sigstore._store", name) | ||
self.add(key_pem) | ||
|
||
def add(self, key_pem: bytes) -> None: | ||
""" | ||
Adds a PEM-encoded key to the current keyring. | ||
""" | ||
key = load_pem_public_key(key_pem) | ||
self._keyring[key_id(key)] = key | ||
|
||
def verify(self, *, key_id: bytes, signature: bytes, data: bytes) -> None: | ||
key = self._keyring.get(key_id) | ||
if key is None: | ||
# If we don't have a key corresponding to this key ID, we can't | ||
# possibly verify the signature. | ||
raise CTKeyringLookupError(f"no known key for key ID {key_id.hex()}") | ||
|
||
try: | ||
if isinstance(key, rsa.RSAPublicKey): | ||
key.verify( | ||
signature=signature, | ||
data=data, | ||
padding=padding.PKCS1v15(), | ||
algorithm=hashes.SHA256(), | ||
) | ||
elif isinstance(key, ec.EllipticCurvePublicKey): | ||
key.verify( | ||
signature=signature, | ||
data=data, | ||
signature_algorithm=ec.ECDSA(hashes.SHA256()), | ||
) | ||
else: | ||
woodruffw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# NOTE(ww): Unreachable without API misuse. | ||
raise CTKeyringError(f"unsupported key type: {key}") | ||
except InvalidSignature as exc: | ||
raise CTKeyringError("invalid signature") from exc |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,9 +27,11 @@ | |
|
||
import requests | ||
from cryptography.hazmat.primitives import serialization | ||
from cryptography.hazmat.primitives.asymmetric import ec, rsa | ||
from cryptography.hazmat.primitives.asymmetric import ec | ||
from pydantic import BaseModel, Field, StrictInt, StrictStr, validator | ||
|
||
from sigstore._internal.ctfe import CTKeyring | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
DEFAULT_REKOR_URL = "https://rekor.sigstore.dev" | ||
|
@@ -272,7 +274,7 @@ def post( | |
class RekorClient: | ||
"""The internal Rekor client""" | ||
|
||
def __init__(self, url: str, pubkey: bytes, ctfe_pubkey: bytes) -> None: | ||
def __init__(self, url: str, pubkey: bytes, ct_keyring: CTKeyring) -> None: | ||
self.url = urljoin(url, "api/v1/") | ||
self.session = requests.Session() | ||
self.session.headers.update( | ||
|
@@ -287,31 +289,20 @@ def __init__(self, url: str, pubkey: bytes, ctfe_pubkey: bytes) -> None: | |
raise RekorClientError(f"Invalid public key type: {pubkey}") | ||
self._pubkey = pubkey | ||
|
||
ctfe_pubkey = serialization.load_pem_public_key(ctfe_pubkey) | ||
if not isinstance( | ||
ctfe_pubkey, | ||
( | ||
rsa.RSAPublicKey, | ||
ec.EllipticCurvePublicKey, | ||
), | ||
): | ||
raise RekorClientError(f"Invalid CTFE public key type: {ctfe_pubkey}") | ||
self._ctfe_pubkey = ctfe_pubkey | ||
self._ct_keyring = ct_keyring | ||
|
||
def __del__(self) -> None: | ||
self.session.close() | ||
|
||
@classmethod | ||
def production(cls) -> RekorClient: | ||
return cls( | ||
DEFAULT_REKOR_URL, _DEFAULT_REKOR_ROOT_PUBKEY, _DEFAULT_REKOR_CTFE_PUBKEY | ||
DEFAULT_REKOR_URL, _DEFAULT_REKOR_ROOT_PUBKEY, CTKeyring.production() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, this would require larger changes than should be in this PR, but is there a separate client for CT? The CT log and Rekor are separate logs. The CT log is accessed at ctfe.sigstore.dev/, and while all Sigstore clients don't need to do online verification (since they only verify the SCTs offline), it's separate from rekor. I'd recommend splitting these apart, or at least associated CT with Fulcio rather than Rekor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, @asraa brought this up as feedback here: #263 (comment) I agree that this should be broken out, and I'll do that in a follow-up 🙂 |
||
) | ||
|
||
@classmethod | ||
def staging(cls) -> RekorClient: | ||
return cls( | ||
STAGING_REKOR_URL, _STAGING_REKOR_ROOT_PUBKEY, _STAGING_REKOR_CTFE_PUBKEY | ||
) | ||
return cls(STAGING_REKOR_URL, _STAGING_REKOR_ROOT_PUBKEY, CTKeyring.staging()) | ||
|
||
@property | ||
def log(self) -> RekorLog: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flagging for review: I welcome a better name for this class (and its corresponding exception class) 🙂