o
    h{0                     @   sl  d Z ddlmZ ddlmZmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddlmZ ddlmZ z*ddlZddlmZ ddlmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' W n e(y   dZ#Y nw z
ddl"m)Z)m*Z* W n e(y   dZ)dZ*Y nw dZ+ee,Z-G dd deZ.G dd de.Z/dS )z5Google Cloud Storage result store backend for Celery.    )ThreadPoolExecutor)datetime	timedelta)getpid)RLock)bytes_to_str)
dictfilter)url_to_parts)maybe_signature)
ChordErrorImproperlyConfigured)GroupResultallow_join_result)
get_logger   )KeyValueStoreBackendNretry)Conflict)if_exception_type)storage)Client)DEFAULT_RETRY)	firestorefirestore_admin_v1)
GCSBackendc                       sp   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Ze	dd Z
e	dd Zdd Zdd Zdd Z  ZS )GCSBackendBasez)Google Cloud Storage task result backend.c                    s  t stdt jdi | t | _t | _t| _	d | _
| jj}| jr2|  }|jdi t| |d| _| js?td|d| _| jsLtd|ddd| _t|d	d
| _t|dpfd| _| jdk rwtd| j d| jr|  std| j d S d S )Nz8You must install google-cloud-storage to use gcs backend
gcs_bucketz:Missing bucket name: specify gcs_bucket to use gcs backendgcs_projectz6Missing project:specify gcs_project to use gcs backendgcs_base_path /gcs_threadpool_maxsize
   gcs_ttlr   zInvalid ttl: z# must be greater than or equal to 0z>Missing lifecycle rule to use gcs backend with ttl on bucket:  )r   r   super__init__r   _client_lockr   _pidr   _retry_policy_clientappconfurl_params_from_urlupdater   getbucket_nameprojectstrip	base_pathint_threadpool_maxsizefloatttl _is_bucket_lifecycle_rule_exists)selfkwargsr-   
url_params	__class__r%   U/var/www/html/optinet_system/venv/lib/python3.10/site-packages/celery/backends/gcs.pyr'   -   sN   
zGCSBackendBase.__init__c                 C   s<   t |}| |}z|j| jdW S  tjjy   Y d S w Nr   )r   	_get_blobdownload_as_bytesr*   r   blobNotFoundr;   keyrD   r%   r%   r@   r1   U   s   
zGCSBackendBase.getc                 C   sB   t |}| |}| jrt t| jd |_|j|| jd d S )Nsecondsr   )	r   rB   r9   r   utcnowr   custom_timeupload_from_stringr*   )r;   rG   valuerD   r%   r%   r@   set]   s
   
zGCSBackendBase.setc                 C   s0   t |}| |}| r|j| jd d S d S rA   )r   rB   existsdeleter*   rF   r%   r%   r@   rP   d   s
   
zGCSBackendBase.deletec                 C   s<   t  }t|| j|W  d    S 1 sw   Y  d S N)r   listmapr1   )r;   keyspoolr%   r%   r@   mgetj   s   $zGCSBackendBase.mgetc                 C   s   | j F | jr| jt kr| jW  d   S t| jd| _t | _tjj| j	| j	dd}| jj
}|d| |jjd| | jW  d   S 1 sLw   Y  dS )zReturns a storage client.Nr3      )pool_connectionspool_maxsizemax_retrieszhttps://)r(   r+   r)   r   r   r3   requestsadaptersHTTPAdapterr7   _httpmount_auth_requestsession)r;   adapterclient_httpr%   r%   r@   clientn   s    $zGCSBackendBase.clientc                 C   s   | j | jS rQ   )re   bucketr2   r;   r%   r%   r@   rf      s   zGCSBackendBase.bucketc                 C   s&   | j r| j  d| n|}| j|S )Nr!   )r5   rf   rD   )r;   rG   key_bucket_pathr%   r%   r@   rB      s   zGCSBackendBase._get_blobc                 C   s4   | j }|  |jD ]}|d d dkr dS q
dS )NactiontypeDeleteTF)rf   reloadlifecycle_rules)r;   rf   ruler%   r%   r@   r:      s   
z/GCSBackendBase._is_bucket_lifecycle_rule_existsc                 C   s   t | j}|j|jd|jS )N)r   r   )r	   r.   hostnamepathquery)r;   	url_partsr%   r%   r@   r/      s   
zGCSBackendBase._params_from_url)__name__
__module____qualname____doc__r'   r1   rN   rP   rV   propertyre   rf   rB   r:   r/   __classcell__r%   r%   r>   r@   r   *   s    (

r   c                       s   e Zd ZdZdZdZdZdZdZ fddZ	e
dd	 Zd
d Z fddZdedefddZdd Zdd Zdd Zdd Zdd Z  ZS )r   zWGoogle Cloud Storage task result backend.

    Uses Firestore for chord ref count.
    Tcelerychord_count
expires_atc                    sh   t rtstdt jdi | t | _d | _| jj	
d| j| _|  s2td| j d| j d S )Nz:You must install google-cloud-firestore to use gcs backendfirestore_projectzHMissing TTL policy to use gcs backend with ttl on Firestore collection: z
 project: r%   )r   r   r   r&   r'   r   _firestore_lock_firestore_clientr,   r-   r1   r3   r|    _is_firestore_ttl_policy_enabled_collection_name)r;   r<   r>   r%   r@   r'      s&   zGCSBackend.__init__c                 C   sn   | j ) | jr| jt kr| jW  d   S tj| jd| _t | _W d   | jS 1 s/w   Y  | jS )zReturns a firestore client.NrW   )r}   r~   r)   r   r   r   r|   rg   r%   r%   r@   firestore_client   s   

zGCSBackend.firestore_clientc                 C   sd   t  }d| j d| j d| j }t j|d}|j|d}|j}|o1|jt j	j
jjt j	j
jjhv S )Nz	projects/z&/databases/(default)/collectionGroups/z/fields/)name)request)r   FirestoreAdminClientr|   r   _field_expiresGetFieldRequest	get_field
ttl_configstateField	TtlConfigStateACTIVECREATING)r;   re   r   r   fieldr   r%   r%   r@   r      s   


z+GCSBackend._is_firestore_ttl_policy_enabledc                    s4   |  |d  }| |d t j||fi |S )Nr   iQ )get_key_for_chorddecode_expire_chord_keyr&   _apply_chord_incr)r;   header_result_argsbodyr<   rG   r>   r%   r@   r      s   zGCSBackend._apply_chord_incrrG   returnc              
   C   sF   |  |}|j| jtdidtjttdddddd}|j	d j
S )	Nr   Tg      ?g     f@g       @)	predicateinitialmaximum
multipliertimeout)merger   r   )_firestore_documentrN   _field_countr   	Incrementr   Retryr   r   transform_resultsinteger_value)r;   rG   docrespr%   r%   r@   incr   s   
zGCSBackend.incrc                 K   s  | j }|j}|s
dS | |}| |}|jd}	|	du r.| ||}
|
du r*dS t|
}	||	kr:t	d| dS ||	kr| ||}
|
du rKdS t
|j|d}|
j}zzt  ||jjdd}W d   n1 smw   Y  W n= ty } z1zt|
 }d||}W n ty   t|}Y nw td|| | |t| W Y d}~n1d}~ww z|| W n0 ty } ztd|| | |td	| W Y d}~nd}~ww W |
  | | dS W |
  | | dS W |
  | | dS |
  | | w dS )
a~  Chord part return callback.

        Called for each task in the chord.
        Increments the counter stored in Firestore.
        If the counter reaches the number of tasks in the chord, the callback
        is called.
        If the callback raises an exception, the chord is marked as errored.
        If the callback returns a value, the chord is marked as successful.
        N
chord_sizez/Chord counter incremented too many times for %rr,   T)r   	propagatezDependency {0.id} raised {1!r}Chord %r raised: %rzCallback error: )r,   groupr   r   chordr1   _restore_depslenloggerwarningr
   join_nativer   r-   result_chord_join_timeout	Exceptionnext_failed_join_reportformatStopIterationrepr	exceptionchord_error_from_stackr   delayrP   _delete_chord_key)r;   r   r   resultr<   r,   gidrG   valsizedepscallbackjretexcculpritreasonr%   r%   r@   on_chord_part_return   s   



zGCSBackend.on_chord_part_returnc              
   C   s   | j }z	tj|| d}W n, ty8 } z t|j|d}td|| | |t	d| W Y d }~d S d }~ww |d u rozt
| t
yn } z!t|j|d}td|| | |t	d| d W Y d }~|S d }~ww |S )N)backendr   r   zCannot restore group: zChord callback %r raised: %rzGroupResult z no longer exists)r,   r   restorer   r
   r   r   r   r   r   
ValueError)r;   r   r   r,   r   r   r   r%   r%   r@   r   7  s6   zGCSBackend._restore_depsc                 C   s   |  |}|  d S rQ   )r   rP   )r;   rG   r   r%   r%   r@   r   O  s   
zGCSBackend._delete_chord_keyc                 C   s4   t  t|d }| |}|j| j|idd dS )zSet TTL policy for a Firestore document.

        Firestore ttl data is typically deleted within 24 hours after its
        expiration date.
        rH   T)r   N)r   rJ   r   r   rN   r   )r;   rG   expiresval_expiresr   r%   r%   r@   r   S  s   
zGCSBackend._expire_chord_keyc                 C   s   | j | jt|S rQ   )r   
collectionr   documentr   )r;   rG   r%   r%   r@   r   ]  s
   
zGCSBackend._firestore_document)rs   rt   ru   rv   implements_incrsupports_native_joinr   r   r   r'   rw   r   r   r   bytesr6   r   r   r   r   r   r   rx   r%   r%   r>   r@   r      s$    
A
r   )0rv   concurrent.futuresr   r   r   osr   	threadingr   kombu.utils.encodingr   kombu.utils.functionalr   kombu.utils.urlr	   celery.canvasr
   celery.exceptionsr   r   celery.resultr   r   celery.utils.logr   baser   r\   google.api_corer   google.api_core.exceptionsr   google.api_core.retryr   google.cloudr   google.cloud.storager   google.cloud.storage.retryr   ImportErrorr   r   __all__rs   r   r   r   r%   r%   r%   r@   <module>   sD    v