o
    h=                     @   s>  d Z ddlZddlZddlmZmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddl m!Z!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z( zddl)Z)W n e*y   dZ)Y nw dZ+dZ,dZ-dZ.G dd dZ/dS )a  WorkController can be used to instantiate in-process workers.

The command-line interface for the worker is in :mod:`celery.bin.worker`,
while the worker program is in :mod:`celery.apps.worker`.

The worker program is responsible for adding signal handlers,
setting up logging, etc.  This is a bare-bones worker without
global side-effects (i.e., except for the global state stored in
:mod:`celery.worker.state`).

The worker consists of several components, all managed by bootsteps
(mod:`celery.bootsteps`).
    N)datetimetimezone)sleep)	cpu_count)detect_environment)	bootsteps)concurrency)signals)RUN	TERMINATE)ImproperlyConfiguredTaskRevokedErrorWorkerTerminate)
EX_FAILUREcreate_pidlock)reload_from_cwd)mlevel)worker_logger)default_nodenameworker_direct)str_to_list)default_socket_timeout   state)WorkControllerg      @z
Trying to select queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.

If you want to automatically declare unknown queues you can
enable the `task_create_missing_queues` setting.
ze
Trying to deselect queue subset of {0!r}, but queue {1} isn't
defined in the `task_queues` setting.
c                   @   s  e Zd ZdZdZdZdZdZdZdZ	G dd de
jZdJddZ		dKddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd ZdLddZdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* ZdMd,d-ZdNd.d/Z dOd1d2Z!dPd3d4Z"dLd5d6Z#dMd7d8Z$d9d: Z%d;d< Z&d=d> Z'd?d@ Z(dAdB Z)e*dCdD Z+																					dQdFdGZ,dHdI Z-dS )Rr   zUnmanaged worker instance.Nc                   @   s   e Zd ZdZdZh dZdS )zWorkController.BlueprintzWorker bootstep blueprint.Worker>   celery.worker.components:Hubcelery.worker.components:Beatcelery.worker.components:Poolcelery.worker.components:Timer celery.worker.components:StateDB!celery.worker.components:Consumer'celery.worker.autoscale:WorkerComponentN)__name__
__module____qualname____doc__namedefault_steps r*   r*   V/var/www/html/optinet_system/venv/lib/python3.10/site-packages/celery/worker/worker.py	BlueprintL   s    r,   c                 K   s   |p| j | _ t|| _ttj| _| j j	  | j
di | | jdi | | jdi | | jdi | jdi | d S )Nr*   )appr   hostnamer   nowr   utcstartup_timeloaderinit_workeron_before_initsetup_defaultson_after_initsetup_instanceprepare_args)selfr-   r.   kwargsr*   r*   r+   __init__Z   s   
 zWorkController.__init__c                 K   s   || _ | || | t| | js&zt | _W n ty%   d| _Y nw t| j| _|p0| j	| _
| j | _|d u r@|  n|| _|| _tjj| d t| j| _g | _|   | j| jjd | j| j| jd| _| jj| fi | d S )N   senderworker)stepson_starton_close
on_stopped)pidfilesetup_queuessetup_includesr   r   r   NotImplementedErrorr   loglevelon_consumer_readyready_callbackr-   connection_for_read	_conninfoshould_use_eventloopuse_eventloopoptionsr	   worker_initsend_concurrencyget_implementationpool_clsr@   on_init_blueprintr,   rA   rB   rC   	blueprintapply)r9   queuesrJ   rD   includerN   exclude_queuesr:   r*   r*   r+   r7   e   s6   

zWorkController.setup_instancec                 C      d S Nr*   r9   r*   r*   r+   rU         z WorkController.on_init_blueprintc                 K   r[   r\   r*   r9   r:   r*   r*   r+   r4      r^   zWorkController.on_before_initc                 K   r[   r\   r*   r_   r*   r*   r+   r6      r^   zWorkController.on_after_initc                 C   s   | j rt| j | _d S d S r\   )rD   r   pidlockr]   r*   r*   r+   rA      s   zWorkController.on_startc                 C   r[   r\   r*   )r9   consumerr*   r*   r+   rI      r^   z WorkController.on_consumer_readyc                 C   s   | j j  d S r\   )r-   r2   shutdown_workerr]   r*   r*   r+   rB      s   zWorkController.on_closec                 C   s,   | j   | j  | jr| j  d S d S r\   )timerstopra   shutdownr`   releaser]   r*   r*   r+   rC      s
   

zWorkController.on_stoppedc              
   C   s   t |}t |}z
| jjj| W n ty( } z
tt 	||d }~ww z
| jjj
| W n tyI } z
tt 	||d }~ww | jjjr\| jjjt| j d S d S r\   )r   r-   amqprX   selectKeyErrorr   SELECT_UNKNOWN_QUEUEstripformatdeselectDESELECT_UNKNOWN_QUEUEconfr   
select_addr.   )r9   rY   excludeexcr*   r*   r+   rE      s*   
zWorkController.setup_queuesc                    sf   t  jjj}|r|t |7 } fdd|D  | _dd  jj D }t t||B  jj_d S )Nc                    s   g | ]	} j j|qS r*   )r-   r2   import_task_module.0mr]   r*   r+   
<listcomp>   s    z1WorkController.setup_includes.<locals>.<listcomp>c                 S   s   h | ]}|j jqS r*   )	__class__r%   )ru   taskr*   r*   r+   	<setcomp>   s    z0WorkController.setup_includes.<locals>.<setcomp>)tupler-   ro   rY   tasksvaluesset)r9   includesprevtask_modulesr*   r]   r+   rF      s   
zWorkController.setup_includesc                 K   s   |S r\   r*   r_   r*   r*   r+   r8      r^   zWorkController.prepare_argsc                 C   s   t jj| d d S )Nr=   )r	   worker_shutdownrQ   r]   r*   r*   r+   _send_worker_shutdown   s   z$WorkController._send_worker_shutdownc              
   C   s   z	| j |  W d S  ty   |   Y d S  ty7 } ztjd|dd | jtd W Y d }~d S d }~w t	yP } z| j|j
d W Y d }~d S d }~w ty_   | jtd Y d S w )NzUnrecoverable error: %rT)exc_info)exitcode)rV   startr   	terminate	Exceptionloggercriticalrd   r   
SystemExitcodeKeyboardInterrupt)r9   rr   r*   r*   r+   r      s   zWorkController.startc                 C   s   | j j| d|fdd d S )Nregister_with_event_loopzhub.register)argsdescription)rV   send_all)r9   hubr*   r*   r+   r      s   
z'WorkController.register_with_event_loopc                 C   s   |  | j|S r\   )_quick_acquire_process_taskr9   reqr*   r*   r+   _process_task_sem   s   z WorkController._process_task_semc                 C   sJ   z	| | j W dS  ty$   z|   W Y dS  ty#   Y Y dS w w )z2Process task by sending it to the pool of workers.N)execute_using_poolpoolr   _quick_releaseAttributeErrorr   r*   r*   r+   r      s   zWorkController._process_taskc                 C   s&   z| j   W d S  ty   Y d S w r\   )ra   closer   r]   r*   r*   r+   signal_consumer_close   s
   z$WorkController.signal_consumer_closec                 C   s    t  dko| jjjjo| jj S )Ndefault)r   rL   	transport
implementsasynchronousr-   
IS_WINDOWSr]   r*   r*   r+   rM      s
   

z#WorkController.should_use_eventloopFc                 C   sF   |dur|| _ | jjtkr|   |r| jjr| jdd |   dS )z7Graceful shutdown of the worker server (Warm shutdown).NTwarm)	r   rV   r   r
   r   r   signal_safe	_shutdownr   )r9   in_sighandlerr   r*   r*   r+   rd      s   zWorkController.stopc                 C   s8   | j jtkr|   |r| jjr| jdd dS dS dS )z>Not so graceful shutdown of the worker server (Cold shutdown).Fr   N)rV   r   r   r   r   r   r   )r9   r   r*   r*   r+   r      s   zWorkController.terminateTc                 C   sX   | j d ur*tt | j j| | d | j   W d    d S 1 s#w   Y  d S d S )N)r   )rV   r   SHUTDOWN_SOCKET_TIMEOUTrd   join)r9   r   r*   r*   r+   r     s   

"zWorkController._shutdownc                 C   sT   t | j|||d | jr| j  | j  z| j  W d S  ty)   Y d S w )N)force_reloadreloader)list_reload_modulesra   update_strategiesreset_rate_limitsr   restartrG   )r9   modulesreloadr   r*   r*   r+   r     s   

zWorkController.reloadc                    s4    fddt |d u rjjjD S |pdD S )Nc                 3   s"    | ]}j |fi  V  qd S r\   )_maybe_reload_modulert   r:   r9   r*   r+   	<genexpr>  s
    
z1WorkController._reload_modules.<locals>.<genexpr>r*   )r~   r-   r2   r   )r9   r   r:   r*   r   r+   r     s   
zWorkController._reload_modulesc                 C   sH   |t jvrtd| | jj|S |r"td| tt j| |S d S )Nzimporting module %szreloading module %s)sysr   r   debugr-   r2   import_from_cwdr   )r9   moduler   r   r*   r*   r+   r      s   
z#WorkController._maybe_reload_modulec                 C   s8   t tj| j }| jjt t	| j
jt| dS )N)totalpidclockuptime)r   r/   r   r0   r1   r   total_countosgetpidstrr-   r   roundtotal_seconds)r9   r   r*   r*   r+   info(  s   

zWorkController.infoc                 C   s   t d u rtdt t j}i d|jd|jd|jd|jd|jd|j	d|j
d	|jd
|jd|jd|jd|jd|jd|jd|jd|jS )Nz%rusage not supported by this platformutimestimemaxrssixrssidrssisrssminfltmajfltnswapinblockoublockmsgsndmsgrcvnsignalsnvcswnivcsw)resourcerG   	getrusageRUSAGE_SELFru_utimeru_stime	ru_maxrssru_ixrssru_idrssru_isrss	ru_minflt	ru_majfltru_nswap
ru_inblock
ru_oublock	ru_msgsnd	ru_msgrcvru_nsignalsru_nvcsw	ru_nivcsw)r9   sr*   r*   r+   rusage/  sH   	
zWorkController.rusagec                 C   s`   |   }|| j |  || jj | j z	|  |d< W |S  ty/   d|d< Y |S w )Nr   zN/A)r   updaterV   ra   r   rG   )r9   r   r*   r*   r+   statsF  s   
zWorkController.statsc                 C   s"   dj | | jr| j dS ddS )z``repr(worker)``.z#<Worker: {self.hostname} ({state})>INIT)r9   r   )rl   rV   human_stater]   r*   r*   r+   __repr__P  s   zWorkController.__repr__c                 C   s   | j S )z#``str(worker) == worker.hostname``.)r.   r]   r*   r*   r+   __str__W  s   zWorkController.__str__c                 C   s   t S r\   r   r]   r*   r*   r+   r   [  s   zWorkController.stateWARNc                 K   s  | j j}|| _|| _|d|| _|d|| _|d||| _|d|| _|d|| _|d|| _	|p2|| _
|d|	| _|d|
| _|d	|| _|d
||| _|d|| _|d||| _|d||| _|d||| _|d|| _|d|| _t|d|| _|d|| _|d|| _d S )Nworker_concurrencyworker_send_task_eventsworker_poolworker_consumerworker_timerworker_timer_precisionworker_autoscalerworker_pool_putlocksworker_pool_restartsworker_state_dbbeat_schedule_filenamebeat_schedulertask_time_limittask_soft_time_limitworker_max_tasks_per_childworker_max_memory_per_childworker_prefetch_multiplierworker_disable_rate_limitsworker_lost_wait)r-   eitherrH   logfiler   task_eventsrT   consumer_cls	timer_clstimer_precisionoptimizationautoscaler_clspool_putlockspool_restartsstatedbschedule_filename	scheduler
time_limitsoft_time_limitmax_tasks_per_childmax_memory_per_childintprefetch_multiplierdisable_rate_limitsr  )r9   r   rH   r  r  r   r  r  r	  r  r  r  r
  Or  r  r  r  rT   state_dbr   r   scheduler_clsr  r  r  r  r  r  _kwr  r*   r*   r+   r5   _  sN   
zWorkController.setup_defaultsc                 C   s^   | j }ttj}|jjrd}|jjdkr+|r-d|jj d}t| t	|jj dS dS dS )a  Wait :setting:`worker_soft_shutdown_timeout` if soft shutdown is enabled.

        To enable soft shutdown, set the :setting:`worker_soft_shutdown_timeout` in the
        configuration. Soft shutdown can be used to allow the worker to finish processing
        few more tasks before initiating a cold shutdown. This mechanism allows the worker
        to finish short tasks that are already in progress and requeue long-running tasks
        to be picked up by another worker.

        .. warning::
            If there are no tasks in the worker, the worker will not wait for the
            soft shutdown timeout even if it is set as it makes no sense to wait for
            the timeout when there are no tasks to process.
        Tr   z)Initiating Soft Shutdown, terminating in z secondsN)
r-   r{   r   active_requestsro   #worker_enable_soft_shutdown_on_idleworker_soft_shutdown_timeoutr   warningr   )r9   r-   requestslogr*   r*   r+   wait_for_soft_shutdown  s   

z%WorkController.wait_for_soft_shutdown)NN)NNNNNNr\   )FN)F)T)NFN)Nr   NNNNNNNNNNNNNNNNNNNNNNNNNN).r$   r%   r&   r'   r-   r`   rV   r   	semaphorer   r   r,   r;   r7   rU   r4   r6   rA   rI   rB   rC   rE   rF   r8   r   r   r   r   r   r   rM   rd   r   r   r   r   r   r   r   r   r   r   propertyr   r5   r"  r*   r*   r*   r+   r   ?   s    

(











=r   )0r'   r   r   r   r   timer   billiardr   kombu.utils.compatr   celeryr   r   rR   r	   celery.bootstepsr
   r   celery.exceptionsr   r   r   celery.platformsr   r   celery.utils.importsr   celery.utils.logr   r   r   celery.utils.nodenamesr   r   celery.utils.textr   celery.utils.threadsr    r   r   ImportError__all__r   rj   rn   r   r*   r*   r*   r+   <module>   s<    