HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: sport3497 (1034)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //lib/python3/dist-packages/cryptography/hazmat/backends/openssl/x509.py
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.


import datetime
import operator
import typing

from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl import dsa, ec, rsa
from cryptography.hazmat.backends.openssl.decode_asn1 import (
    _asn1_integer_to_int,
    _asn1_string_to_bytes,
    _decode_x509_name,
    _obj2txt,
    _parse_asn1_time,
)
from cryptography.hazmat.backends.openssl.encode_asn1 import (
    _encode_asn1_int_gc,
    _txt2obj_gc,
)
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.x509.base import _PUBLIC_KEY_TYPES
from cryptography.x509.name import _ASN1Type


class _Certificate(x509.Certificate):
    # Keep-alive reference used by OCSP
    _ocsp_resp_ref: typing.Any

    def __init__(self, backend, x509_cert):
        self._backend = backend
        self._x509 = x509_cert

        version = self._backend._lib.X509_get_version(self._x509)
        if version == 0:
            self._version = x509.Version.v1
        elif version == 2:
            self._version = x509.Version.v3
        else:
            raise x509.InvalidVersion(
                "{} is not a valid X509 version".format(version), version
            )

    def __repr__(self):
        return "<Certificate(subject={}, ...)>".format(self.subject)

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, _Certificate):
            return NotImplemented

        res = self._backend._lib.X509_cmp(self._x509, other._x509)
        return res == 0

    def __ne__(self, other: object) -> bool:
        return not self == other

    def __hash__(self) -> int:
        return hash(self.public_bytes(serialization.Encoding.DER))

    def __deepcopy__(self, memo):
        return self

    def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
        h = hashes.Hash(algorithm, self._backend)
        h.update(self.public_bytes(serialization.Encoding.DER))
        return h.finalize()

    version = utils.read_only_property("_version")

    @property
    def serial_number(self) -> int:
        asn1_int = self._backend._lib.X509_get_serialNumber(self._x509)
        self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
        return _asn1_integer_to_int(self._backend, asn1_int)

    def public_key(self) -> _PUBLIC_KEY_TYPES:
        pkey = self._backend._lib.X509_get_pubkey(self._x509)
        if pkey == self._backend._ffi.NULL:
            # Remove errors from the stack.
            self._backend._consume_errors()
            raise ValueError("Certificate public key is of an unknown type")

        pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)

        return self._backend._evp_pkey_to_public_key(pkey)

    @property
    def not_valid_before(self) -> datetime.datetime:
        asn1_time = self._backend._lib.X509_get0_notBefore(self._x509)
        return _parse_asn1_time(self._backend, asn1_time)

    @property
    def not_valid_after(self) -> datetime.datetime:
        asn1_time = self._backend._lib.X509_get0_notAfter(self._x509)
        return _parse_asn1_time(self._backend, asn1_time)

    @property
    def issuer(self) -> x509.Name:
        issuer = self._backend._lib.X509_get_issuer_name(self._x509)
        self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
        return _decode_x509_name(self._backend, issuer)

    @property
    def subject(self) -> x509.Name:
        subject = self._backend._lib.X509_get_subject_name(self._x509)
        self._backend.openssl_assert(subject != self._backend._ffi.NULL)
        return _decode_x509_name(self._backend, subject)

    @property
    def signature_hash_algorithm(
        self,
    ) -> typing.Optional[hashes.HashAlgorithm]:
        oid = self.signature_algorithm_oid
        try:
            return x509._SIG_OIDS_TO_HASH[oid]
        except KeyError:
            raise UnsupportedAlgorithm(
                "Signature algorithm OID:{} not recognized".format(oid)
            )

    @property
    def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
        alg = self._backend._ffi.new("X509_ALGOR **")
        self._backend._lib.X509_get0_signature(
            self._backend._ffi.NULL, alg, self._x509
        )
        self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
        oid = _obj2txt(self._backend, alg[0].algorithm)
        return x509.ObjectIdentifier(oid)

    @utils.cached_property
    def extensions(self) -> x509.Extensions:
        return self._backend._certificate_extension_parser.parse(self._x509)

    @property
    def signature(self) -> bytes:
        sig = self._backend._ffi.new("ASN1_BIT_STRING **")
        self._backend._lib.X509_get0_signature(
            sig, self._backend._ffi.NULL, self._x509
        )
        self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
        return _asn1_string_to_bytes(self._backend, sig[0])

    @property
    def tbs_certificate_bytes(self) -> bytes:
        pp = self._backend._ffi.new("unsigned char **")
        res = self._backend._lib.i2d_re_X509_tbs(self._x509, pp)
        self._backend.openssl_assert(res > 0)
        pp = self._backend._ffi.gc(
            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
        )
        return self._backend._ffi.buffer(pp[0], res)[:]

    def public_bytes(self, encoding: serialization.Encoding) -> bytes:
        bio = self._backend._create_mem_bio_gc()
        if encoding is serialization.Encoding.PEM:
            res = self._backend._lib.PEM_write_bio_X509(bio, self._x509)
        elif encoding is serialization.Encoding.DER:
            res = self._backend._lib.i2d_X509_bio(bio, self._x509)
        else:
            raise TypeError("encoding must be an item from the Encoding enum")

        self._backend.openssl_assert(res == 1)
        return self._backend._read_mem_bio(bio)


class _RevokedCertificate(x509.RevokedCertificate):
    def __init__(self, backend, crl, x509_revoked):
        self._backend = backend
        # The X509_REVOKED_value is a X509_REVOKED * that has
        # no reference counting. This means when X509_CRL_free is
        # called then the CRL and all X509_REVOKED * are freed. Since
        # you can retain a reference to a single revoked certificate
        # and let the CRL fall out of scope we need to retain a
        # private reference to the CRL inside the RevokedCertificate
        # object to prevent the gc from being called inappropriately.
        self._crl = crl
        self._x509_revoked = x509_revoked

    @property
    def serial_number(self) -> int:
        asn1_int = self._backend._lib.X509_REVOKED_get0_serialNumber(
            self._x509_revoked
        )
        self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
        return _asn1_integer_to_int(self._backend, asn1_int)

    @property
    def revocation_date(self) -> datetime.datetime:
        return _parse_asn1_time(
            self._backend,
            self._backend._lib.X509_REVOKED_get0_revocationDate(
                self._x509_revoked
            ),
        )

    @utils.cached_property
    def extensions(self) -> x509.Extensions:
        return self._backend._revoked_cert_extension_parser.parse(
            self._x509_revoked
        )


@utils.register_interface(x509.CertificateRevocationList)
class _CertificateRevocationList(object):
    def __init__(self, backend, x509_crl):
        self._backend = backend
        self._x509_crl = x509_crl

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, _CertificateRevocationList):
            return NotImplemented

        res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl)
        return res == 0

    def __ne__(self, other: object) -> bool:
        return not self == other

    def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
        h = hashes.Hash(algorithm, self._backend)
        bio = self._backend._create_mem_bio_gc()
        res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl)
        self._backend.openssl_assert(res == 1)
        der = self._backend._read_mem_bio(bio)
        h.update(der)
        return h.finalize()

    @utils.cached_property
    def _sorted_crl(self):
        # X509_CRL_get0_by_serial sorts in place, which breaks a variety of
        # things we don't want to break (like iteration and the signature).
        # Let's dupe it and sort that instead.
        dup = self._backend._lib.X509_CRL_dup(self._x509_crl)
        self._backend.openssl_assert(dup != self._backend._ffi.NULL)
        dup = self._backend._ffi.gc(dup, self._backend._lib.X509_CRL_free)
        return dup

    def get_revoked_certificate_by_serial_number(
        self, serial_number: int
    ) -> typing.Optional[x509.RevokedCertificate]:
        revoked = self._backend._ffi.new("X509_REVOKED **")
        asn1_int = _encode_asn1_int_gc(self._backend, serial_number)
        res = self._backend._lib.X509_CRL_get0_by_serial(
            self._sorted_crl, revoked, asn1_int
        )
        if res == 0:
            return None
        else:
            self._backend.openssl_assert(revoked[0] != self._backend._ffi.NULL)
            return _RevokedCertificate(
                self._backend, self._sorted_crl, revoked[0]
            )

    @property
    def signature_hash_algorithm(
        self,
    ) -> typing.Optional[hashes.HashAlgorithm]:
        oid = self.signature_algorithm_oid
        try:
            return x509._SIG_OIDS_TO_HASH[oid]
        except KeyError:
            raise UnsupportedAlgorithm(
                "Signature algorithm OID:{} not recognized".format(oid)
            )

    @property
    def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
        alg = self._backend._ffi.new("X509_ALGOR **")
        self._backend._lib.X509_CRL_get0_signature(
            self._x509_crl, self._backend._ffi.NULL, alg
        )
        self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
        oid = _obj2txt(self._backend, alg[0].algorithm)
        return x509.ObjectIdentifier(oid)

    @property
    def issuer(self) -> x509.Name:
        issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl)
        self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
        return _decode_x509_name(self._backend, issuer)

    @property
    def next_update(self) -> datetime.datetime:
        nu = self._backend._lib.X509_CRL_get0_nextUpdate(self._x509_crl)
        self._backend.openssl_assert(nu != self._backend._ffi.NULL)
        return _parse_asn1_time(self._backend, nu)

    @property
    def last_update(self) -> datetime.datetime:
        lu = self._backend._lib.X509_CRL_get0_lastUpdate(self._x509_crl)
        self._backend.openssl_assert(lu != self._backend._ffi.NULL)
        return _parse_asn1_time(self._backend, lu)

    @property
    def signature(self) -> bytes:
        sig = self._backend._ffi.new("ASN1_BIT_STRING **")
        self._backend._lib.X509_CRL_get0_signature(
            self._x509_crl, sig, self._backend._ffi.NULL
        )
        self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
        return _asn1_string_to_bytes(self._backend, sig[0])

    @property
    def tbs_certlist_bytes(self) -> bytes:
        pp = self._backend._ffi.new("unsigned char **")
        res = self._backend._lib.i2d_re_X509_CRL_tbs(self._x509_crl, pp)
        self._backend.openssl_assert(res > 0)
        pp = self._backend._ffi.gc(
            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
        )
        return self._backend._ffi.buffer(pp[0], res)[:]

    def public_bytes(self, encoding: serialization.Encoding) -> bytes:
        bio = self._backend._create_mem_bio_gc()
        if encoding is serialization.Encoding.PEM:
            res = self._backend._lib.PEM_write_bio_X509_CRL(
                bio, self._x509_crl
            )
        elif encoding is serialization.Encoding.DER:
            res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl)
        else:
            raise TypeError("encoding must be an item from the Encoding enum")

        self._backend.openssl_assert(res == 1)
        return self._backend._read_mem_bio(bio)

    def _revoked_cert(self, idx):
        revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
        r = self._backend._lib.sk_X509_REVOKED_value(revoked, idx)
        self._backend.openssl_assert(r != self._backend._ffi.NULL)
        return _RevokedCertificate(self._backend, self, r)

    def __iter__(self):
        for i in range(len(self)):
            yield self._revoked_cert(i)

    def __getitem__(self, idx):
        if isinstance(idx, slice):
            start, stop, step = idx.indices(len(self))
            return [self._revoked_cert(i) for i in range(start, stop, step)]
        else:
            idx = operator.index(idx)
            if idx < 0:
                idx += len(self)
            if not 0 <= idx < len(self):
                raise IndexError
            return self._revoked_cert(idx)

    def __len__(self) -> int:
        revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
        if revoked == self._backend._ffi.NULL:
            return 0
        else:
            return self._backend._lib.sk_X509_REVOKED_num(revoked)

    @utils.cached_property
    def extensions(self) -> x509.Extensions:
        return self._backend._crl_extension_parser.parse(self._x509_crl)

    def is_signature_valid(self, public_key: _PUBLIC_KEY_TYPES) -> bool:
        if not isinstance(
            public_key,
            (
                dsa._DSAPublicKey,
                rsa._RSAPublicKey,
                ec._EllipticCurvePublicKey,
            ),
        ):
            raise TypeError(
                "Expecting one of DSAPublicKey, RSAPublicKey,"
                " or EllipticCurvePublicKey."
            )
        res = self._backend._lib.X509_CRL_verify(
            self._x509_crl, public_key._evp_pkey
        )

        if res != 1:
            self._backend._consume_errors()
            return False

        return True


@utils.register_interface(x509.CertificateSigningRequest)
class _CertificateSigningRequest(object):
    def __init__(self, backend, x509_req):
        self._backend = backend
        self._x509_req = x509_req

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, _CertificateSigningRequest):
            return NotImplemented

        self_bytes = self.public_bytes(serialization.Encoding.DER)
        other_bytes = other.public_bytes(serialization.Encoding.DER)
        return self_bytes == other_bytes

    def __ne__(self, other: object) -> bool:
        return not self == other

    def __hash__(self) -> int:
        return hash(self.public_bytes(serialization.Encoding.DER))

    def public_key(self) -> _PUBLIC_KEY_TYPES:
        pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
        self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
        pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
        return self._backend._evp_pkey_to_public_key(pkey)

    @property
    def subject(self) -> x509.Name:
        subject = self._backend._lib.X509_REQ_get_subject_name(self._x509_req)
        self._backend.openssl_assert(subject != self._backend._ffi.NULL)
        return _decode_x509_name(self._backend, subject)

    @property
    def signature_hash_algorithm(
        self,
    ) -> typing.Optional[hashes.HashAlgorithm]:
        oid = self.signature_algorithm_oid
        try:
            return x509._SIG_OIDS_TO_HASH[oid]
        except KeyError:
            raise UnsupportedAlgorithm(
                "Signature algorithm OID:{} not recognized".format(oid)
            )

    @property
    def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
        alg = self._backend._ffi.new("X509_ALGOR **")
        self._backend._lib.X509_REQ_get0_signature(
            self._x509_req, self._backend._ffi.NULL, alg
        )
        self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
        oid = _obj2txt(self._backend, alg[0].algorithm)
        return x509.ObjectIdentifier(oid)

    @utils.cached_property
    def extensions(self) -> x509.Extensions:
        x509_exts = self._backend._lib.X509_REQ_get_extensions(self._x509_req)
        x509_exts = self._backend._ffi.gc(
            x509_exts,
            lambda x: self._backend._lib.sk_X509_EXTENSION_pop_free(
                x,
                self._backend._ffi.addressof(
                    self._backend._lib._original_lib, "X509_EXTENSION_free"
                ),
            ),
        )
        return self._backend._csr_extension_parser.parse(x509_exts)

    def public_bytes(self, encoding: serialization.Encoding) -> bytes:
        bio = self._backend._create_mem_bio_gc()
        if encoding is serialization.Encoding.PEM:
            res = self._backend._lib.PEM_write_bio_X509_REQ(
                bio, self._x509_req
            )
        elif encoding is serialization.Encoding.DER:
            res = self._backend._lib.i2d_X509_REQ_bio(bio, self._x509_req)
        else:
            raise TypeError("encoding must be an item from the Encoding enum")

        self._backend.openssl_assert(res == 1)
        return self._backend._read_mem_bio(bio)

    @property
    def tbs_certrequest_bytes(self) -> bytes:
        pp = self._backend._ffi.new("unsigned char **")
        res = self._backend._lib.i2d_re_X509_REQ_tbs(self._x509_req, pp)
        self._backend.openssl_assert(res > 0)
        pp = self._backend._ffi.gc(
            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
        )
        return self._backend._ffi.buffer(pp[0], res)[:]

    @property
    def signature(self) -> bytes:
        sig = self._backend._ffi.new("ASN1_BIT_STRING **")
        self._backend._lib.X509_REQ_get0_signature(
            self._x509_req, sig, self._backend._ffi.NULL
        )
        self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
        return _asn1_string_to_bytes(self._backend, sig[0])

    @property
    def is_signature_valid(self) -> bool:
        pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
        self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
        pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
        res = self._backend._lib.X509_REQ_verify(self._x509_req, pkey)

        if res != 1:
            self._backend._consume_errors()
            return False

        return True

    def get_attribute_for_oid(self, oid: x509.ObjectIdentifier) -> bytes:
        obj = _txt2obj_gc(self._backend, oid.dotted_string)
        pos = self._backend._lib.X509_REQ_get_attr_by_OBJ(
            self._x509_req, obj, -1
        )
        if pos == -1:
            raise x509.AttributeNotFound(
                "No {} attribute was found".format(oid), oid
            )

        attr = self._backend._lib.X509_REQ_get_attr(self._x509_req, pos)
        self._backend.openssl_assert(attr != self._backend._ffi.NULL)
        # We don't support multiple valued attributes for now.
        self._backend.openssl_assert(
            self._backend._lib.X509_ATTRIBUTE_count(attr) == 1
        )
        asn1_type = self._backend._lib.X509_ATTRIBUTE_get0_type(attr, 0)
        self._backend.openssl_assert(asn1_type != self._backend._ffi.NULL)
        # We need this to ensure that our C type cast is safe.
        # Also this should always be a sane string type, but we'll see if
        # that is true in the real world...
        if asn1_type.type not in (
            _ASN1Type.UTF8String.value,
            _ASN1Type.PrintableString.value,
            _ASN1Type.IA5String.value,
        ):
            raise ValueError(
                "OID {} has a disallowed ASN.1 type: {}".format(
                    oid, asn1_type.type
                )
            )

        data = self._backend._lib.X509_ATTRIBUTE_get0_data(
            attr, 0, asn1_type.type, self._backend._ffi.NULL
        )
        self._backend.openssl_assert(data != self._backend._ffi.NULL)
        # This cast is safe iff we assert on the type above to ensure
        # that it is always a type of ASN1_STRING
        data = self._backend._ffi.cast("ASN1_STRING *", data)
        return _asn1_string_to_bytes(self._backend, data)


@utils.register_interface(
    x509.certificate_transparency.SignedCertificateTimestamp
)
class _SignedCertificateTimestamp(object):
    def __init__(self, backend, sct_list, sct):
        self._backend = backend
        # Keep the SCT_LIST that this SCT came from alive.
        self._sct_list = sct_list
        self._sct = sct

    @property
    def version(self) -> x509.certificate_transparency.Version:
        version = self._backend._lib.SCT_get_version(self._sct)
        assert version == self._backend._lib.SCT_VERSION_V1
        return x509.certificate_transparency.Version.v1

    @property
    def log_id(self) -> bytes:
        out = self._backend._ffi.new("unsigned char **")
        log_id_length = self._backend._lib.SCT_get0_log_id(self._sct, out)
        assert log_id_length >= 0
        return self._backend._ffi.buffer(out[0], log_id_length)[:]

    @property
    def timestamp(self) -> datetime.datetime:
        timestamp = self._backend._lib.SCT_get_timestamp(self._sct)
        milliseconds = timestamp % 1000
        return datetime.datetime.utcfromtimestamp(timestamp // 1000).replace(
            microsecond=milliseconds * 1000
        )

    @property
    def entry_type(self) -> x509.certificate_transparency.LogEntryType:
        entry_type = self._backend._lib.SCT_get_log_entry_type(self._sct)
        # We currently only support loading SCTs from the X.509 extension, so
        # we only have precerts.
        assert entry_type == self._backend._lib.CT_LOG_ENTRY_TYPE_PRECERT
        return x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE

    @property
    def _signature(self):
        ptrptr = self._backend._ffi.new("unsigned char **")
        res = self._backend._lib.SCT_get0_signature(self._sct, ptrptr)
        self._backend.openssl_assert(res > 0)
        self._backend.openssl_assert(ptrptr[0] != self._backend._ffi.NULL)
        return self._backend._ffi.buffer(ptrptr[0], res)[:]

    def __hash__(self) -> int:
        return hash(self._signature)

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, _SignedCertificateTimestamp):
            return NotImplemented

        return self._signature == other._signature

    def __ne__(self, other: object) -> bool:
        return not self == other