o
    vhDV                     @   s`  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ ddlmZmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ G dd de jZG dd dee jZG dd deZee
ejef  Z G dd dZ!G dd de!Z"G dd deZ#G dd dZ$G dd dZ%G dd dZ&G dd dee Z'G d d! d!eZ(dS )"    N)defaultdict)AsyncGeneratorIterableIteratorListOptionalUnion)x509)
trust_list   )CertTrustAnchorTrustAnchor)PathBuildingError)CertificateFetcher)ValidationPath)CancelableAsyncIteratorConsListc                   @   sD   e Zd ZdZdefddZdefddZdejfdd	Z	d
d Z
dS )CertificateCollectionzS
    Abstract base class for read-only access to a collection of certificates.
    key_identifierc                 C   s   |  |}|s	dS |d S )z
        Retrieves a cert via its key identifier

        :param key_identifier:
            A byte string of the key identifier

        :return:
            None or an asn1crypto.x509.Certificate object
        Nr   )retrieve_many_by_key_identifier)selfr   
candidates r   \/var/www/html/hyperkenya/venv/lib/python3.10/site-packages/pyhanko_certvalidator/registry.pyretrieve_by_key_identifier   s   

z0CertificateCollection.retrieve_by_key_identifierc                 C      t )z
        Retrieves possibly multiple certs via the corresponding key identifiers

        :param key_identifier:
            A byte string of the key identifier

        :return:
            A list of asn1crypto.x509.Certificate objects
        NotImplementedErrorr   r   r   r   r   r   '      
z5CertificateCollection.retrieve_many_by_key_identifiernamec                 C   r   )z
        Retrieves a list certs via their subject name

        :param name:
            An asn1crypto.x509.Name object

        :return:
            A list of asn1crypto.x509.Certificate objects
        r   r   r    r   r   r   retrieve_by_name3   r   z&CertificateCollection.retrieve_by_namec                 C   r   )a]  
        Retrieve a certificate by its ``issuer_serial`` value.

        :param issuer_serial:
            The ``issuer_serial`` value of the certificate.
        :return:
            The certificate corresponding to the ``issuer_serial`` key
            passed in.
        :return:
            None or an asn1crypto.x509.Certificate object
        r   r   issuer_serialr   r   r   retrieve_by_issuer_serial?      z/CertificateCollection.retrieve_by_issuer_serialN)__name__
__module____qualname____doc__bytesr   r   r	   Namer"   r%   r   r   r   r   r      s    r   c                   @   s<   e Zd ZdejdefddZdeej fddZdd	 Z	d
S )CertificateStorecertreturnc                 C   r   )
        Register a single certificate.

        :param cert:
            Certificate to add.
        :return:
            ``True`` if the certificate was added, ``False`` if it already
            existed in this store.
        r   r   r.   r   r   r   registerO   r   zCertificateStore.registercertsc                 C   s    d}|D ]	}||  |O }q|S )a  
        Register multiple certificates.

        :param certs:
            Certificates to register.
        :return:
            ``True`` if at least one certificate was added, ``False``
            if all certificates already existed in this store.
        Fr2   )r   r3   addedr.   r   r   r   register_multiple[   s   z"CertificateStore.register_multiplec                 C   r   Nr   r   r   r   r   __iter__k      zCertificateStore.__iter__N)
r'   r(   r)   r	   Certificateboolr2   r   r6   r9   r   r   r   r   r-   N   s    r-   c                   @   sn   e Zd ZdZedd Zdd Zdejde	fdd	Z
d
d Zdd ZdefddZdejfddZdd ZdS )SimpleCertificateStorez-
    Simple trustless certificate store.
    c                 C   s   |  }|D ]}| | q|S r7   r4   )clsr3   resultr.   r   r   r   
from_certst   s   z!SimpleCertificateStore.from_certsc                 C   s   i | _ tt| _tt| _d S r7   )r3   r   list_subject_map_key_identifier_mapr8   r   r   r   __init__{   s   
zSimpleCertificateStore.__init__r.   r/   c                 C   sd   |j | jv rdS || j|j < | j|jj | |jr&| j|j | dS | j|jj	 | dS )r0   FT)
r$   r3   rB   subjecthashableappendr   rC   
public_keysha1r1   r   r   r   r2      s   
zSimpleCertificateStore.registerc                 C   
   | j | S r7   )r3   )r   itemr   r   r   __getitem__      
z"SimpleCertificateStore.__getitem__c                 C   s   t | j S r7   )iterr3   valuesr8   r   r   r   r9      s   zSimpleCertificateStore.__iter__r   c                 C   rJ   r7   )rC   r   r   r   r   r      rM   z6SimpleCertificateStore.retrieve_many_by_key_identifierr    c                 C   s   | j |j S r7   )rB   rF   r!   r   r   r   r"      s   z'SimpleCertificateStore.retrieve_by_namec                 C   s    z| | W S  t y   Y d S w r7   )KeyErrorr#   r   r   r   r%      s
   
z0SimpleCertificateStore.retrieve_by_issuer_serialN)r'   r(   r)   r*   classmethodr@   rD   r	   r;   r<   r2   rL   r9   r+   r   r,   r"   r%   r   r   r   r   r=   o   s    
r=   c                   @   s<   e Zd ZdZdejdefddZdejdee	 fddZ
dS )	TrustManagerz%
    Abstract trust manager API.
    r.   r/   c                 C   r   z
        Checks if a certificate is in the list of trust roots in this registry

        :param cert:
            An asn1crypto.x509.Certificate object

        :return:
            A boolean - if the certificate is in the CA list
        r   r1   r   r   r   is_root   r   zTrustManager.is_rootc                 C   r   )z
        Find potential issuers that might have (directly) issued
        a particular certificate.

        :param cert:
            Issued certificate.
        :return:
            An iterator with potentially relevant trust anchors.
        r   r1   r   r   r   find_potential_issuers   r&   z#TrustManager.find_potential_issuersN)r'   r(   r)   r*   r	   r;   r<   rT   r   r   rU   r   r   r   r   rR      s    rR   c                   @   s   e Zd ZdZdd Ze		ddee dee dd fdd	Zd
e	e
ejf fddZdejfddZdeej fddZdejdee
 fddZdS )SimpleTrustManagerzk
    Trust manager backed by a list of trust roots, possibly in addition to the
    system trust list.
    c                 C   s   t  | _tt| _d S r7   )set_rootsr   rA   _root_subject_mapr8   r   r   r   rD      s   zSimpleTrustManager.__init__Ntrust_rootsextra_trust_rootsr/   c                 C   sT   |du rdd t  D }nt|}|dur|| t }|D ]}|| q |S )a  
        :param trust_roots:
            If the operating system's trust list should not be used, instead
            pass a list of asn1crypto.x509.Certificate objects. These
            certificates will be used as the trust roots for the path being
            built.

        :param extra_trust_roots:
            If the operating system's trust list should be used, but augmented
            with one or more extra certificates. This should be a list of
            asn1crypto.x509.Certificate objects.
        :return:
        Nc                 S   s   g | ]}|d  qS )r   r   ).0er   r   r   
<listcomp>   s    z,SimpleTrustManager.build.<locals>.<listcomp>)r
   get_listrA   extendrV   _register_root)r>   rZ   r[   manager
trust_rootr   r   r   build   s   
zSimpleTrustManager.buildrc   c                 C   sP   t |tr|}nt|}|| jvr&|j}| j| | j|jj 	| d S d S r7   )

isinstancer   r   rX   	authorityaddrY   r    rF   rG   )r   rc   anchorrf   r   r   r   ra      s   

z!SimpleTrustManager._register_rootr.   c                 C   s   t || jv S rS   )r   rX   r1   r   r   r   rT      s   zSimpleTrustManager.is_rootc                 C   s   dd | j D S )Nc                 s   s     | ]}t |tr|jV  qd S r7   )re   r   certificate)r\   rootr   r   r   	<genexpr>  s    
z0SimpleTrustManager.iter_certs.<locals>.<genexpr>)rX   r8   r   r   r   
iter_certs  s   zSimpleTrustManager.iter_certsc                 c   s0    |j j}| j| D ]}|j|r|V  q
d S r7   )issuerrF   rY   rf   is_potential_issuer_of)r   r.   issuer_hashablerj   r   r   r   rU     s   z)SimpleTrustManager.find_potential_issuers)NN)r'   r(   r)   r*   rD   rQ   r   TrustRootListrd   r   r   r	   r;   ra   rT   r   rl   rU   r   r   r   r   rV      s*    
rV   c                	       s   e Zd ZdZdddee f fddZe	ddddee	j
 dee fd	d
Z	dde	jdee	j
 f fddZde	j
dedeeee	j
f  fddZde	j
fddZ  ZS )CertificateRegistryz
    Contains certificate lists used to build validation paths, and
    is also capable of fetching missing certificates if a certificate
    fetcher is supplied.
    Ncert_fetcherrs   c                   s   t    || _d S r7   )superrD   fetcher)r   rs   	__class__r   r   rD   #  s   

zCertificateRegistry.__init__r   r3   c                C   s(   | |d}|D ]}| | q||_|S )a  
        Convenience method to set up a certificate registry and import
        certs into it.

        :param certs:
            Initial list of certificates to import.
        :param cert_fetcher:
            Certificate fetcher to handle retrieval of missing certificates
            (in situations where that is possible).
        :return:
            A populated certificate registry.
        rr   )r2   ru   )r>   r3   rs   r?   r.   r   r   r   rd   '  s
   
zCertificateRegistry.buildr    first_certificatec                    sN   g }d}t  |D ]}|r|j|jkr|}q
|| q
|r%|d| |S )af  
        Retrieves a list certs via their subject name

        :param name:
            An asn1crypto.x509.Name object

        :param first_certificate:
            An asn1crypto.x509.Certificate object that if found, should be
            placed first in the result list

        :return:
            A list of asn1crypto.x509.Certificate objects
        Nr   )rt   r"   sha256rG   insert)r   r    rx   outputfirstr.   rv   r   r   r"   B  s   z$CertificateRegistry.retrieve_by_namer.   trust_managerr/   c                 c   sp    |j j}||E d H  | j| D ]#}||rq|jr(|jr(|j|jkr'qn
|jr2|j|jkr2q|V  qd S r7   )	rm   rF   rU   rB   rT   authority_key_identifierr   authority_issuer_serialr$   )r   r.   r}   ro   rm   r   r   r   rU   `  s   
z*CertificateRegistry.find_potential_issuersc                 C  sJ   | j d u rd S dd | j |2 I d H }| | |D ]}|V  qd S )Nc                    s   g | z3 d H W }|q6 S r7   r   )r\   rm   r   r   r   r^   |  s    zGCertificateRegistry.fetch_missing_potential_issuers.<locals>.<listcomp>)ru   fetch_cert_issuersr6   )r   r.   issuersrm   r   r   r   fetch_missing_potential_issuersx  s   


z3CertificateRegistry.fetch_missing_potential_issuers)r   r7   )r'   r(   r)   r*   r   r   rD   rQ   r   r	   r;   rd   r,   r"   rR   r   r   r   rU   r   __classcell__r   r   rv   r   rq     s4    
rq   c                   @   sR   e Zd ZdZdedefddZdd Zdej	fd	d
Z
dej	dee fddZdS )PathBuilderz(
    Class to handle path building.
    r}   registryc                 C   s   || _ || _d S r7   )r}   r   )r   r}   r   r   r   r   rD     s   
zPathBuilder.__init__c                 C   s   t | |S )a  
        Builds a list of ValidationPath objects from a certificate in the
        operating system trust store to the end-entity certificate

        .. note::
            This is a synchronous equivalent of :meth:`async_build_paths`
            that calls the latter in a new event loop. As such, it can't be used
            from within asynchronous code.

        :param end_entity_cert:
            A byte string of a DER or PEM-encoded X.509 certificate, or an
            instance of asn1crypto.x509.Certificate

        :return:
            A list of pyhanko_certvalidator.path.ValidationPath objects that
            represent the possible paths from the end-entity certificate to one
            of the CA certs.
        )asynciorunasync_build_paths)r   end_entity_certr   r   r   build_paths  s   zPathBuilder.build_pathsr   c                    s.   g }|  |2 z3 dH W }|| q6 |S )a1  
        Builds a list of ValidationPath objects from a certificate in the
        operating system trust store to the end-entity certificate, returning
        all paths in a single list.

        :param end_entity_cert:
            A byte string of a DER or PEM-encoded X.509 certificate, or an
            instance of asn1crypto.x509.Certificate

        :return:
            A list of pyhanko_certvalidator.path.ValidationPath objects that
            represent the possible paths from the end-entity certificate to one
            of the CA certs.
        N)async_build_paths_lazyrG   )r   r   pathsr?   r   r   r   r     s   zPathBuilder.async_build_pathsr/   c                 C   s(   t | t|t|jg d}t||S )a  
        Builds a list of ValidationPath objects from a certificate in the
        operating system trust store to the end-entity certificate, and emit
        them as an asynchronous generator.

        :param end_entity_cert:
            A byte string of a DER or PEM-encoded X.509 certificate, or an
            instance of asn1crypto.x509.Certificate

        :return:
            An asynchronous iterator that yields
            pyhanko_certvalidator.path.ValidationPath objects that
            represent the possible paths from the end-entity certificate to one
            of the CA certs, and raises PathBuildingError
            if no paths could be built
        )path
certs_seenfailed_paths)_PathWalkerr   singr$   LazyPathIterator)r   r   walkerr   r   r   r     s   

z"PathBuilder.async_build_paths_lazyN)r'   r(   r)   r*   rR   rq   rD   r   r	   r;   r   r   r   r   r   r   r   r   r     s    
r   c                   @   s|   e Zd Zdddejdee fddZedd Z	d	d
 Z
dd Zdeeejf fddZdeeejf fddZdd ZdS )_IssuerFetcherpath_builderr   r.   r   c                 C   sL   || _ || _|| _| jj|| jj}t|| _d| _d| _	d | _
d| _d S )Nr   F)r.   r   r   r   rU   r}   rN   local_iss_iterlocal_issuers_foundfetched_issuers_found_fetched_cas_fetching_done)r   r   r.   r   local_issuersr   r   r   rD     s   

z_IssuerFetcher.__init__c                 C   s   | j | j S r7   )r   r   r8   r   r   r   issuers_found  s   z_IssuerFetcher.issuers_foundc                 C      | S r7   r   r8   r   r   r   	__aiter__  r:   z_IssuerFetcher.__aiter__c                 C   r   r7   r   r8   r   r   r   r9     r:   z_IssuerFetcher.__iter__r/   c                 C   sB   | j D ]}t|tjr|j}|| jv rq|  jd7  _|  S t)Nr   )r   re   r	   r;   r$   r   r   StopIterationr   rm   cert_idr   r   r   __next__  s   

z_IssuerFetcher.__next__c                    s   zt | W S  ty   Y nw | jd u r$| js$| js$| jj| j| _| jd urJ| j2 z3 d H W }|j	}|| j
v r;q,|  jd7  _|  S 6 d| _t)Nr   T)nextr   r   r   r   r   r   r   r.   r$   r   r   StopAsyncIterationr   r   r   r   	__anext__  s2   



z_IssuerFetcher.__anext__c                    s0   | j d ur| j  I d H  d | _ d| _d S d S )NT)r   acloser   r8   r   r   r   cancel  s   

z_IssuerFetcher.cancelN)r'   r(   r)   r	   r;   r   r+   rD   propertyr   r   r9   r   r   r   r   r   r   r   r   r   r     s    


r   c                
   @   sR   e Zd Zdddeej dee deeej  fddZdd	 Z	d
d Z
dd ZdS )r   r   r   r   r   r   c                 C   sF   || _ || _|| _|j}t|tjsJ t|||| _|| _	d | _
d S r7   )r   r   r   headre   r	   r;   r   _issuer_fetcherr   _next_level)r   r   r   r   r   r.   r   r   r   rD   %  s   
z_PathWalker.__init__c                    sJ   | j d ur| j  I d H  d | _ | jd ur#| j I d H  d | _d S d S r7   )r   r   r   r8   r   r   r   r   5  s   


z_PathWalker.cancelc                 C   r   r7   r   r8   r   r   r   r   =  r:   z_PathWalker.__aiter__c              
      s   | j d u rtd }|d u r~| jd u rcz
| j  I d H }W n ty9 } z| j js0| j| j d | _ |d }~ww t|t	rPt
| j}t||d d |d S t| j| j|| j|j| j| _z
| j I d H }W n tyy   d | _Y nw |d u s|S )N)r   r   r   r   r   r   rG   r   re   r   rA   r   r   r   consr   r$   )r   	next_pathnext_issuerr]   r3   r   r   r   r   @  s>   





z_PathWalker.__anext__N)r'   r(   r)   r   r	   r;   r+   r   rD   r   r   r   r   r   r   r   r   $  s    
r   c                   @   sP   e Zd ZU dZee ed< dedej	fddZ
dd Zd	d
 ZdefddZdS )r   N_as_rootr   r.   c                 C   s:   |j j|rtt|g d | _|| _d| _|jj	| _
d S )Nr   )r   r}   rT   r   r   r   _walkeremitted_countrE   human_friendly_name)r   r   r.   r   r   r   rD   f  s
   zLazyPathIterator.__init__c                    s$   | j d ur| j  I d H  d S d S r7   )r   r   r8   r   r   r   r   n  s   
zLazyPathIterator.cancelc                 C   r   r7   r   r8   r   r   r   r   r  r:   zLazyPathIterator.__aiter__r/   c                    s   | j d u rt| jd ur|  jd7  _d | _ | jS z| j  I d H }|  jd7  _|W S  ty5   Y nw | jdkr]| j jd j}t|tj	sJJ |j
j}d | _ td| j d| dt)Nr   r   z7Unable to build a validation path for the certificate "z" - no issuer matching "z" was found)r   r   r   r   r   r   r   re   r	   r;   rm   r   r   r   )r   r   	path_headmissing_issuer_namer   r   r   r   u  s6   


zLazyPathIterator.__anext__)r'   r(   r)   r   r   r   __annotations__r   r	   r;   rD   r   r   r   r   r   r   r   r   c  s   
 r   c                   @   sH   e Zd ZdZdee fddZdefddZde	j
fd	d
Zdd ZdS )LayeredCertificateStorezi
    Trustless certificate store that looks up certificates in other stores
    in a specific order.
    storesc                 C   s
   || _ d S r7   )_stores)r   r   r   r   r   rD     rM   z LayeredCertificateStore.__init__r   c                        fdd}t | S )Nc                  3   "    j D ]
} |  E d H  qd S r7   )r   r   storer   r   r   r   _gen     
zELayeredCertificateStore.retrieve_many_by_key_identifier.<locals>._genrA   )r   r   r   r   r   r   r        
z7LayeredCertificateStore.retrieve_many_by_key_identifierr    c                    r   )Nc                  3   r   r7   )r   r"   r   r    r   r   r   r     r   z6LayeredCertificateStore.retrieve_by_name.<locals>._genr   )r   r    r   r   r   r   r"     r   z(LayeredCertificateStore.retrieve_by_namec                 C   s*   | j D ]}||}|d ur|  S qd S r7   )r   r%   )r   r$   r   r?   r   r   r   r%     s   

z1LayeredCertificateStore.retrieve_by_issuer_serialN)r'   r(   r)   r*   r   r   rD   r+   r   r	   r,   r"   r%   r   r   r   r   r     s    r   ))abcr   collectionsr   typingr   r   r   r   r   r   
asn1cryptor	   oscryptor
   rf   r   r   errorsr   fetchersr   r   r   utilr   r   ABCr   r-   r=   r;   rp   rR   rV   rq   r   r   r   r   r   r   r   r   r   <module>   s.    <!8 RiSL?.