o
    h1                     @   sv  d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	m
Z
 ddlZddlmZmZ ddlmZmZ ddlmZ ddlmZ ejd	d
Zedi dZedddhdZeddhdZG dd dejZeddeddddfddZeddededdfdedede de
e ef de	e  dede!d e"d!eej fd"d#Z#eddedfd$d%Z$dede
e ef de d!dfd&d'Z%dS )(z'Embedded workers for integration tests.    N)contextmanager)AnyIterableOptionalUnion)Celeryworker)_set_task_join_will_blockallow_join_result)Signal)anon_nodenameWORKER_LOGLEVELerrortest_worker_starting)nameproviding_argstest_worker_startedr   consumertest_worker_stoppedc                       sX   e Zd ZdZdZdZ fddZG dd dejj	Z	 fdd	Z
d
d Zdd Z  ZS )TestWorkControllerz3Worker that can synchronize on being fully started.FNc                    s   t  | _t j|i | | jjdd dkrPddlm	} | | _
t | _zddlm} |  W n	 ty=   Y nw tj| j
t | _| j  d S d S )N.preforkr   )Queue)pickling_support)	threadingEvent_on_startedsuper__init__pool_cls
__module__splitbilliardr   logger_queueosgetpidpidtblibr   installImportErrorlogginghandlersQueueListener	getLoggerqueue_listenerstart)selfargskwargsr   r   	__class__ _/var/www/html/optinet_system/venv/lib/python3.10/site-packages/celery/contrib/testing/worker.pyr   '   s   

zTestWorkController.__init__c                   @   s   e Zd Zdd Zdd ZdS )zTestWorkController.QueueHandlerc                 C   s
   d|_ |S )NT)
from_queuer1   recordr6   r6   r7   prepare>   s   z'TestWorkController.QueueHandler.preparec                 C   s   t jr d S )N)r+   raiseExceptionsr9   r6   r6   r7   handleErrorC   s   z+TestWorkController.QueueHandler.handleErrorN)__name__r!   __qualname__r;   r=   r6   r6   r6   r7   QueueHandler=   s    r@   c                    s@    j r  j }| fdd t }|| t  S )Nc                    s   | j  jkot| dd S )Nr8   F)processr'   getattr)rr1   r6   r7   <lambda>J   s    z*TestWorkController.start.<locals>.<lambda>)r$   r@   	addFilterr+   r.   
addHandlerr   r0   )r1   handlerloggerr4   rD   r7   r0   G   s   

zTestWorkController.startc                 C   s    | j   tj| j| |d dS )z=Callback called when the Consumer blueprint is fully started.)senderr   r   N)r   setr   sendapp)r1   r   r6   r6   r7   on_consumer_readyO   s   

z$TestWorkController.on_consumer_readyc                 C   s   | j   dS )zWait for worker to be fully up and running.

        Warning:
            Worker must be started within a thread for this to work,
            or it will block forever.
        N)r   waitrD   r6   r6   r7   ensure_startedV   s   z!TestWorkController.ensure_started)r>   r!   r?   __doc____test__r$   r   r+   r,   r@   r0   rN   rP   __classcell__r6   r6   r4   r7   r      s    
r      soloTg      $@c              
   k   s    t j| d d}	z]t| f||||||d|2}	|rAddlm}
 t  |
 j|ddks2J W d   n1 s<w   Y  |	V  W d   n1 sNw   Y  W tj| |	d dS W tj| |	d dS tj| |	d w )	z[Start embedded worker.

    Yields:
        celery.app.worker.Worker: worker instance.
    )rJ   N)concurrencypoolloglevellogfileperform_ping_checkshutdown_timeoutrT   )ping)timeoutpong)rJ   r   )	r   rL   _start_worker_threadtasksr\   r
   delaygetr   )rM   rV   rW   rX   rY   rZ   ping_task_timeoutr[   r3   r   r\   r6   r6   r7   start_workera   s2   "rd   rM   rV   rW   rX   rY   WorkControllerrZ   r[   returnc                 k   s.   t | || |rd| jv sJ | jtjdd}	|	jj W d   n1 s)w   Y  |d| ||dt	 |||d|ddddd
|}
t
j|
jdd	}|  |
  td
 z|
V  W ddlm} d|_|| | rxtdd|_dS ddlm} d|_|| | rtdd|_w )zaStart Celery worker in a thread.

    Yields:
        celery.worker.Worker: worker instance.
    zcelery.pingTEST_BROKER)hostnameNrh   without_heartbeatT)
rM   rV   rh   rW   rX   rY   ready_callbackri   without_minglewithout_gossip)targetdaemonFr   )statezWorker thread failed to exit within the allocated timeout. Consider raising `shutdown_timeout` if your tasks take longer to execute.r6   )setup_app_for_workerr`   
connectionr%   environrb   default_channelqueue_declarepopr   r   Threadr0   rP   r	   celery.workerro   should_terminatejoinis_aliveRuntimeError)rM   rV   rW   rX   rY   re   rZ   r[   r3   connr   tro   r6   r6   r7   r_      sV   




r_   c           	      k   sP    ddl m}m} |   ||dg}|  z
dV  W |  dS |  w )zfStart worker in separate process.

    Yields:
        celery.app.worker.Worker: worker instance.
    r   )ClusterNodeztestworker1@%hN)celery.apps.multir~   r   set_currentr0   stopwait)	rM   rV   rW   rX   rY   r3   r~   r   clusterr6   r6   r7   _start_worker_process   s   r   c                 C   s8   |    |   |   dt| j_| jj||d dS )z9Setup the app to be used for starting an embedded worker.F)rX   rY   N)finalizer   set_defaulttypelog_setupsetup)rM   rX   rY   r6   r6   r7   rp      s
   rp   )&rQ   r+   r%   r   
contextlibr   typingr   r   r   r   celery.worker.consumerceleryr   r   celery.resultr	   r
   celery.utils.dispatchr   celery.utils.nodenamesr   rr   rb   r   r   r   r   re   r   rd   intstrboolfloatr_   r   rp   r6   r6   r6   r7   <module>   s    C'
7&