o
    h
                     @  sx   d Z ddlmZ ddlmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ d	Ze	eZejZG d
d dejZdS )zWorker Task Consumer Bootstep.    )annotations)QoSignore_errors)	bootsteps)
get_logger)detect_quorum_queues   )Mingle)Tasksc                      sP   e Zd ZdZefZ fddZdd Zdd Zdd	 Z	d
d Z
dddZ  ZS )r
   z,Bootstep starting the task message consumer.c                   s$   d  |_ |_t j|fi | d S )N)task_consumerqossuper__init__)selfckwargs	__class__ ^/var/www/html/optinet_system/venv/lib/python3.10/site-packages/celery/worker/consumer/tasks.pyr      s   zTasks.__init__c                   s^       |   jjd j  jjj j j	d _
 fdd}t| j _dS )zStart task consumer.r   )on_decode_errorc                   s    j j| dS )N)prefetch_countapply_global)r   r   )r   r   
qos_globalr   r   set_prefetch_count,   s   z'Tasks.start.<locals>.set_prefetch_countN)update_strategiesr   
connectiondefault_channel	basic_qosinitial_prefetch_countappamqpTaskConsumerr   r   r   r   )r   r   r   r   r   r   start   s   
zTasks.startc                 C  s$   |j rtd t||j j dS dS )zStop task consumer.zCanceling task consumer...N)r   debugr   cancelr   r   r   r   r   stop3   s   z
Tasks.stopc                 C  s4   |j r| | td t||j j d|_ dS dS )zShutdown task consumer.zClosing consumer channel...N)r   r(   r%   r   closer'   r   r   r   shutdown9   s   

zTasks.shutdownc                 C  s   d|j r	|j jiS diS )zReturn task consumer info.r   zN/A)r   valuer'   r   r   r   infoA   s   z
Tasks.inforeturnboolc                 C  s@   |j j }|jjjrt|j|j jj\}}|rd}t	d |S )zDetermine if global QoS should be applied.

        Additional information:
            https://www.rabbitmq.com/docs/consumer-prefetch
            https://www.rabbitmq.com/docs/quorum-queues#global-qos
        Fz5Global QoS is disabled. Prefetch count in now static.)
r   qos_semantics_matches_specr!   confworker_detect_quorum_queuesr   	transportdriver_typeloggerr,   )r   r   r   using_quorum_queuesqnamer   r   r   r   E   s   



zTasks.qos_global)r-   r.   )__name__
__module____qualname____doc__r	   requiresr   r$   r(   r*   r,   r   __classcell__r   r   r   r   r
      s    r
   N)r:   
__future__r   kombu.commonr   r   celeryr   celery.utils.logr   celery.utils.quorum_queuesr   mingler	   __all__r7   r4   r%   StartStopStepr
   r   r   r   r   <module>   s    