8000 sigstore: add a CT keyring, use it for SCT verification by woodruffw · Pull Request #267 · sigstore/sigstore-python · GitHub
[go: up one dir, main page]

Skip to content

sigstore: add a CT keyring, use it for SCT verification #267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 24, 2022
5 changes: 4 additions & 1 deletion sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from typing import Optional, TextIO, Union, cast

from sigstore import __version__
from sigstore._internal.ctfe import CTKeyring
from sigstore._internal.fulcio.client import DEFAULT_FULCIO_URL, FulcioClient
from sigstore._internal.oidc.ambient import (
GitHubOidcPermissionCredentialError,
Expand All @@ -35,6 +36,7 @@
)
from sigstore._internal.rekor.client import DEFAULT_REKOR_URL, RekorClient
from sigstore._sign import Signer
from sigstore._utils import load_pem_public_key
from sigstore._verify import (
CertificateVerificationFailure,
RekorEntryMissing,
Expand Down Expand Up @@ -357,10 +359,11 @@ def _sign(args: argparse.Namespace) -> None:
elif args.fulcio_url == DEFAULT_FULCIO_URL and args.rekor_url == DEFAULT_REKOR_URL:
signer = Signer.production()
else:
ct_keyring = CTKeyring([load_pem_public_key(args.ctfe_pem.read())])
signer = Signer(
fulcio=FulcioClient(args.fulcio_url),
rekor=RekorClient(
args.rekor_url, args.rekor_root_pubkey.read(), args.ctfe_pem.read()
args.rekor_url, args.rekor_root_pubkey.read(), ct_keyring
),
)

Expand Down
127 changes: 127 additions & 0 deletions sigstore/_internal/ctfe.py
10000
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright 2022 The Sigstore Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Functionality for interacting with CT ("CTFE") signing keys.
"""

from __future__ import annotations

from importlib import resources
from typing import List

import cryptography.hazmat.primitives.asymmetric.padding as padding
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, rsa

from sigstore._utils import PublicKey, key_id, load_pem_public_key


class CTKeyringError(Exception):
"""
Raised on failure by `CTKeyring.verify()`.
"""

pass


class CTKeyringLookupError(CTKeyringError):
"""
A specialization of `CTKeyringError`, indicating that the specified
key ID wasn't found in the keyring.
"""

pass


class CTKeyring:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flagging for review: I welcome a better name for this class (and its corresponding exception class) 🙂

"""
Represents a set of CT signing keys, each of which is a potentially
valid signer for a Signed Certificate Timestamp (SCT).

This structure exists to facilitate key rotation in a CT log.
"""

def __init__(self, keys: List[PublicKey] = []):
self._keyring = {}
for key in keys:
self._keyring[key_id(key)] = key

@classmethod
def staging(cls) -> CTKeyring:
"""
Returns a `CTKeyring` instance capable of verifying SCTs from
Sigstore's staging deployment.
"""
keyring = cls()
keyring._add_resource("ctfe.staging.pub")
keyring._add_resource("ctfe_2022.staging.pub")
keyring._add_resource("ctfe_2022.2.staging.pub")

return keyring

@classmethod
def production(cls) -> CTKeyring:
"""
Returns a `CTKeyring` instance capable of verifying SCTs from
Sigstore's production deployment.
"""
keyring = cls()
keyring._add_resource("ctfe.pub")
keyring._add_resource("ctfe_2022.pub")

return keyring

def _add_resource(self, name: str) -> None:
"""
Adds a key to the current keyring, as identified by its
resource name under `sigstore._store`.
"""
key_pem = resources.read_binary("sigstore._store", name)
self.add(key_pem)

def add(self, key_pem: bytes) -> None:
"""
Adds a PEM-encoded key to the current keyring.
"""
key = load_pem_public_key(key_pem)
self._keyring[key_id(key)] = key

def verify(self, *, key_id: bytes, signature: bytes, data: bytes) -> None:
key = self._keyring.get(key_id)
if key is None:
# If we don't have a key corresponding to this key ID, we can't
# possibly verify the signature.
raise CTKeyringLookupError(f"no known key for key ID {key_id.hex()}")

try:
if isinstance(key, rsa.RSAPublicKey):
key.verify(
signature=signature,
data=data,
padding=padding.PKCS1v15(),
algorithm=hashes.SHA256(),
)
elif isinstance(key, ec.EllipticCurvePublicKey):
key.verify(
signature=signature,
data=data,
signature_algorithm=ec.ECDSA(hashes.SHA256()),
)
else:
# NOTE(ww): Unreachable without API misuse.
raise CTKeyringError(f"unsupported key type: {key}")
except InvalidSignature as exc:
raise CTKeyringError("invalid signature") from exc
23 changes: 7 additions & 16 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@

import requests
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.asymmetric import ec
from pydantic import BaseModel, Field, StrictInt, StrictStr, validator

from sigstore._internal.ctfe import CTKeyring

logger = logging.getLogger(__name__)

DEFAULT_REKOR_URL = "https://rekor.sigstore.dev"
Expand Down Expand Up @@ -272,7 +274,7 @@ def post(
class RekorClient:
"""The internal Rekor client"""

def __init__(self, url: str, pubkey: bytes, ctfe_pubkey: bytes) -> None:
def __init__(self, url: str, pubkey: bytes, ct_keyring: CTKeyring) -> None:
self.url = urljoin(url, "api/v1/")
self.session = requests.Session()
self.session.headers.update(
Expand All @@ -287,31 +289,20 @@ def __init__(self, url: str, pubkey: bytes, ctfe_pubkey: bytes) -> None:
raise RekorClientError(f"Invalid public key type: {pubkey}")
self._pubkey = pubkey

ctfe_pubkey = serialization.load_pem_public_key(ctfe_pubkey)
if not isinstance(
ctfe_pubkey,
(
rsa.RSAPublicKey,
ec.EllipticCurvePublicKey,
),
):
raise RekorClientError(f"Invalid CTFE public key type: {ctfe_pubkey}")
self._ctfe_pubkey = ctfe_pubkey
self._ct_keyring = ct_keyring

def __del__(self) -> None:
self.session.close()

@classmethod
def production(cls) -> RekorClient:
return cls(
DEFAULT_REKOR_URL, _DEFAULT_REKOR_ROOT_PUBKEY, _DEFAULT_REKOR_CTFE_PUBKEY
DEFAULT_REKOR_URL, _DEFAULT_REKOR_ROOT_PUBKEY, CTKeyring.production()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this would require larger changes than should be in this PR, but is there a separate client for CT? The CT log and Rekor are separate logs. The CT log is accessed at ctfe.sigstore.dev/, and while all Sigstore clients don't need to do online verification (since they only verify the SCTs offline), it's separate from rekor. I'd recommend splitting these apart, or at least associated CT with Fulcio rather than Rekor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, @asraa brought this up as feedback here: #263 (comment)

I agree that this should be broken out, and I'll do that in a follow-up 🙂

)

@classmethod
def staging(cls) -> RekorClient:
return cls(
STAGING_REKOR_URL, _STAGING_REKOR_ROOT_PUBKEY, _STAGING_REKOR_CTFE_PUBKEY
)
return cls(STAGING_REKOR_URL, _STAGING_REKOR_ROOT_PUBKEY, CTKeyring.staging())

@property
def log(self) -> RekorLog:
Expand Down
Loading
0