o
    hM                     @   s  d Z ddlZddlZddlmZmZmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZ dZdZ ee!Z"edg dZ#dd Z$dd Z%G dd deZ&dd Z'dd Z(e( dd Z)e(d d!d"efgd#dd%d&Z*d'd( Z+e(d)d*d+d,d- Z,ej-j.fd.d/Z/ej0j1ej2j1fd0d1Z3e'd2d*d+dd3d4Z4e'd5d6d+dd7d8Z5dd9d:Z6e'd2d;e7fgd<d=d>d? Z8e'd@e7fdAe7fgdBdCdDdA Z9e'd@e7fdEe:fdFe:fgdGdCddHdIZ;e( dJdK Z<e' ddLdMZ=e' dNdO Z>e' dPdQ Z?e' dRdS Z@e(d$dTddUdVZAe(dWdXdYdZ ZBe( d[d\ ZCe(d]d^d_d` ZDdadb ZEe(dcd^ddde ZFe(dfd^ddgdhZGe(did^djdk ZHe(dldmdndoddpdqZIe(drdse7fdteJfdueJfgdvdwdd{d|ZKe( d}d~ ZLe(deJfgddCdddZMe'deJfgddCdddZNe'deJfgddCdddZOe' dddZPe'deJfdeJfgddCdddZQe' dddZRe'de7fde7fde7fde7fgddC		dddZSe'de7fgddCdd ZTe( dd ZUdS )z.Worker remote control command implementations.    N)UserDictdefaultdict
namedtuple)TERM_SIGNAME)	safe_repr)WorkerShutdown)EX_OK)signals)
maybe_list)
get_logger)jsonify	strtobool)rate   state)Request)Panel)exchangerouting_key
rate_limitcontroller_info_t)aliastypevisibledefault_timeouthelp	signatureargsvariadicc                 C      d| iS )Nok valuer"   r"   W/var/www/html/optinet_system/venv/lib/python3.10/site-packages/celery/worker/control.pyr!         r!   c                 C   r    )Nerrorr"   r#   r"   r"   r%   nok#   r&   r(   c                   @   s8   e Zd ZdZi Zi Zedd Ze			d
dd	ZdS )r   z+Global registry of remote control commands.c                 O   s(   |r| j di || S | j di |S )Nr"   )	_register)clsr   kwargsr"   r"   r%   register-   s   zPanel.registerNcontrolT      ?c
              
      s"    	f
dd}
|
S )Nc              	      s^   p| j }p| jpd dd }| j|< t 	|j|<  r-| j < | S )N 
r   )__name____doc__stripsplitdatar   meta)funcontrol_name_help
r   r   r*   r   r   namer   r   r   r   r"   r%   _inner8   s   



zPanel._register.<locals>._innerr"   )r*   r;   r   r   r   r   r   r   r   r   r<   r"   r:   r%   r)   3   s   
zPanel._register)	NNr-   Tr.   NNNN)	r1   
__module____qualname__r2   r5   r6   classmethodr,   r)   r"   r"   r"   r%   r   '   s    
r   c                  K      t jdddi| S )Nr   r-   r"   r   r,   r+   r"   r"   r%   control_commandE      rC   c                  K   r@   )Nr   inspectr"   rA   rB   r"   r"   r%   inspect_commandI   rD   rF   c                 C   s   t | j S )z6Information about Celery installation for bug reports.)r!   app	bugreportr   r"   r"   r%   reportO      rI   	dump_confz[include_defaults=False]with_defaults)r   r   r   Fc                 K   s   t | jjj|dttdS )zList configuration.)rL   )	keyfilterunknown_type_filter)r   rG   conftable_wanted_config_keyr   )r   rL   r+   r"   r"   r%   rO   U   s   rO   c                 C   s   t | to
| d S )N__)
isinstancestr
startswith)keyr"   r"   r%   rQ   a   s   rQ   idsz[id1 [id2 [... [idN]]]])r   r   c                 K   s   dd t t|D S )z!Query for task information by id.c                 S   s    i | ]}|j t|| fqS r"   )id_state_of_taskinfo).0reqr"   r"   r%   
<dictcomp>m   s    zquery_task.<locals>.<dictcomp>)_find_requests_by_idr
   )r   rW   r+   r"   r"   r%   
query_taskg   s   
r_   c              	   c   s0    | D ]}z||V  W q t y   Y qw d S N)KeyError)rW   get_requesttask_idr"   r"   r%   r^   s   s   r^   c                 C   s   || rdS || rdS dS )Nactivereservedreadyr"   )request	is_activeis_reservedr"   r"   r%   rY   |   s
   rY   rc   c                 K   sR   t t|pg d}}t| |||fi |}t|tr!d|v r!|S td| dS )zRevoke task by task id (or list of ids).

    Keyword Arguments:
        terminate (bool): Also terminate the process if the task is active.
        signal (str): Name of signal to use for terminate (e.g., ``KILL``).
    Nr!   ztasks z flagged as revoked)setr
   _revokerS   dictr!   )r   rc   	terminatesignalr+   task_idsr"   r"   r%   revoke   s
   rp   headersz/[key1=value1 [key2=value2 [... [keyN=valueN]]]]c                 K   s,  t |pt}t|trdd |D }| D ]\}}ttj	|p#g tt| }|tj|< q|s;t
d| dS ttj}	tt}
|	D ]=}t|dr|jr| D ].\}}||jv rt|}t|j| }t|t|@ }|r|
| | |j| jj|d qTqF|
st
d| dS t
d|
 dS )	a  Revoke task by header (or list of headers).

    Keyword Arguments:
        headers(dictionary): Dictionary that contains stamping scheme name as keys and stamps as values.
                             If headers is a list, it will be converted to a dictionary.
        terminate (bool): Also terminate the process if the task is active.
        signal (str): Name of signal to use for terminate (e.g., ``KILL``).
    Sample headers input:
        {'mtask_id': [id1, id2, id3]}
    c                 S   s&   i | ]}| d d | d d qS )=r   r   )r4   )r[   hr"   r"   r%   r]      s   & z-revoke_by_stamped_headers.<locals>.<dictcomp>zheaders z' flagged as revoked, but not terminatedstampsrn   z were not terminatedz revoked)_signalssignumr   rS   listitemsr
   worker_staterevoked_stampsgetr!   active_requestsr   rj   hasattrrt   updaterm   consumerpool)r   rq   rm   rn   r+   rw   headerrt   updated_stampsr}   #terminated_scheme_to_stamps_mappingr\   expected_header_keyexpected_header_valueactual_headermatching_stamps_for_requestr"   r"   r%   revoke_by_stamped_headers   s0   
 

r   c           
      K   s   t |}t }tj| |rQt|pt}t|D ]&}|j	|vr@|
|j	 td|j	| |j| jj|d t ||kr@ nq|sGtdS tdd|S d|}	td|	 |S )NzTerminating %s (%s)ru   zterminate: tasks unknownzterminate: {}z, zTasks flagged as revoked: %s)lenrj   rz   revokedr   rv   rw   r   r^   rX   addloggerrZ   rm   r   r   r!   formatjoin)
r   ro   rm   rn   r+   size
terminatedrw   rg   idstrr"   r"   r%   rk      s&   

rk   rn   z <signal> [id1 [id2 [... [idN]]]])r   r   r   c                 K   s   t | |d|dS )z+Terminate task by task id (or list of ids).T)rm   rn   )rp   )r   rn   rc   r+   r"   r"   r%   rm      s   rm   	task_namer   z0<task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>)r   r   c              
   K   s   zt | W n ty } ztd|W  Y d}~S d}~ww z	|| jj| _W n ty>   tjd|dd td Y S w | j	
  |sPtd| tdS td	|| td
S )zTell worker(s) to modify the rate limit for a task by type.

    See Also:
        :attr:`celery.app.task.Task.rate_limit`.

    Arguments:
        task_name (str): Type of task to set rate limit for.
        rate_limit (int, str): New rate limit.
    zInvalid rate limit string: Nz&Rate limit attempt for unknown task %sTexc_infounknown taskz)Rate limits disabled for tasks of type %sz rate limit disabled successfullyz(New rate limit for tasks of type %s: %s.znew rate limit set successfully)r   
ValueErrorr(   rG   tasksr   ra   r   r'   r   reset_rate_limitsrZ   r!   )r   r   r   r+   excr"   r"   r%   r      s,   
softhardz#<task_name> <soft_secs> [hard_secs]c                 K   s`   z| j j| }W n ty   tjd|dd td Y S w ||_||_td||| t	dS )zTell worker(s) to modify the time limit for task by type.

    Arguments:
        task_name (str): Name of task to change.
        hard (float): Hard time limit.
        soft (float): Soft time limit.
    z-Change time limit attempt for unknown task %sTr   r   z5New time limits for tasks of type %s: soft=%s hard=%sztime limits set successfully)
rG   r   ra   r   r'   r(   soft_time_limit
time_limitrZ   r!   )r   r   r   r   r+   taskr"   r"   r%   r      s   r   c                 K   s   d| j jjiS )z Get current logical clock value.clock)rG   r   r$   r   r+   r"   r"   r%   r   >  rJ   r   c                 K   s"   | j jr| j j||| dS dS )zHold election.

    Arguments:
        id (str): Unique election id.
        topic (str): Election topic.
        action (str): Action to take for elected actor.
    N)r   gossipelection)r   rX   topicactionr+   r"   r"   r%   r   D  s   	r   c                 C   s>   | j j}|jrd|jvr|jd td tdS tdS )z+Tell worker(s) to send task-related events.r   z)Events of group {task} enabled by remote.ztask events enabledztask events already enabled)r   event_dispatchergroupsr   r   rZ   r!   r   
dispatcherr"   r"   r%   enable_eventsQ  s   
r   c                 C   s8   | j j}d|jv r|jd td tdS tdS )z3Tell worker(s) to stop sending task-related events.r   z*Events of group {task} disabled by remote.ztask events disabledztask events already disabled)r   r   r   discardr   rZ   r!   r   r"   r"   r%   disable_events\  s   

r   c                 C   s,   t d | jj}|jddditj dS )z3Tell worker(s) to send event heartbeat immediately.zHeartbeat requested by remote.worker-heartbeatfreq   N)r   )r   debugr   r   sendrz   SOFTWARE_INFOr   r"   r"   r%   	heartbeatg  s   
r   )r   c                 K   sJ   || j kr#td| |rtj| tj  tjj| jj	
 dS dS )zRequest mingle sync-data.zsync with %s)r   r   N)hostnamer   rZ   rz   r   r   purge_datarG   r   forward)r   	from_noder   r+   r"   r"   r%   helloq  s   


r   g?)r   c                 K   s   t dS )zPing worker(s).pong)r!   r   r"   r"   r%   ping  s   r   c                 K   s   | j j S )z&Request worker statistics/information.)r   
controllerstatsr   r"   r"   r%   r     s   r   dump_schedule)r   c                 K   s   t t| jjS )z0List of currently scheduled ETA/countdown tasks.)rx   _iter_schedule_requestsr   timerr   r"   r"   r%   	scheduled  s   r   c              
   c   sj    | j jD ]-}z|jjd }W n ttfy   Y qw t|tr2|jr(|j	 nd |j
| dV  qd S )Nr   )etapriorityrg   )schedulequeueentryr   
IndexError	TypeErrorrS   r   r   	isoformatr   rZ   )r   waitingarg0r"   r"   r%   r     s   
r   dump_reservedc                 K   s.   |  tj|  tj }|sg S dd |D S )zAList of currently reserved tasks, not including scheduled/active.c                 S   s   g | ]}|  qS r"   rZ   r[   rg   r"   r"   r%   
<listcomp>  s    zreserved.<locals>.<listcomp>)tsetrz   reserved_requestsr}   )r   r+   reserved_tasksr"   r"   r%   re     s   

re   dump_activec                    s    fdd|  tjD S )z'List of tasks currently being executed.c                    s   g | ]}|j  d qS )safer   r   r   r"   r%   r     s    zactive.<locals>.<listcomp>)r   rz   r}   )r   r   r+   r"   r   r%   rd     s   

rd   dump_revokedc                 K   s
   t tjS )zList of revoked task-ids.)rx   rz   r   r   r"   r"   r%   r     s   
r   
dump_taskstaskinfoitemsz[attr1 [attr2 [... [attrN]]]])r   r   r   c                    sJ   | j jpt|rndd D }fdd  fddt|D S )zList of registered tasks.

    Arguments:
        taskinfoitems (Sequence[str]): List of task attributes to include.
            Defaults to ``exchange,routing_key,rate_limit``.
        builtins (bool): Also include built-in tasks.
    c                 s   s    | ]
}| d s|V  qdS )zcelery.N)rU   r[   r   r"   r"   r%   	<genexpr>  s    

zregistered.<locals>.<genexpr>c                    sB    fddD }|rdd |  D }d jd|S  jS )Nc                    s.   i | ]}t  |d d ur|tt  |d qS r`   )getattrrT   )r[   fieldr   r"   r%   r]     s
    z5registered.<locals>._extract_info.<locals>.<dictcomp>c                 S   s   g | ]}d  |qS )rr   )r   )r[   fr"   r"   r%   r     s    z5registered.<locals>._extract_info.<locals>.<listcomp>z{} [{}] )ry   r   r;   r   )r   fieldsrZ   )r   r   r%   _extract_info  s   
z!registered.<locals>._extract_infoc                    s   g | ]} | qS r"   r"   r   )r   regr"   r%   r     s    zregistered.<locals>.<listcomp>)rG   r   DEFAULT_TASK_INFO_ITEMSsorted)r   r   builtinsr+   r   r"   )r   r   r   r%   
registered  s   
r   g      N@r   num	max_depthz.[object_type=Request] [num=200 [max_depth=10]])r   r   r      
   r   c                    s   zddl }W n ty   tdw td| tjdddd$}||d|  |j | fd	d
|jd d|jiW  d   S 1 sGw   Y  dS )a  Create graph of uncollected objects (memory-leak debugging).

    Arguments:
        num (int): Max number of objects to graph.
        max_depth (int): Traverse at most n levels deep.
        type (str): Name of object to graph.  Default is ``"Request"``.
    r   NzRequires the objgraph libraryzDumping graph for type %rcobjgz.pngF)prefixsuffixdeletec                    s   |  v S r`   r"   )vobjectsr"   r%   <lambda>  s    zobjgraph.<locals>.<lambda>)r   	highlightfilenamer   )	objgraphImportErrorr   rZ   tempfileNamedTemporaryFileby_typeshow_backrefsr;   )r   r   r   r   	_objgraphfhr"   r   r%   r     s$   $r   c                 K   s   ddl m} | S )z Sample current RSS memory usage.r   )
sample_mem)celery.utils.debugr   )r   r+   r   r"   r"   r%   	memsample  s   r   samplesz[n_samples=10]c                 K   s(   ddl m} t }|j|d | S )z/Dump statistics of previous memsample requests.r   )r   )file)celery.utilsr   ioStringIOmemdumpgetvalue)r   r   r+   r   outr"   r"   r%   r    s   r  nz[N=1]c                 K   s4   | j jjr	tdS | j j| | j | tdS )z!Grow pool by n processes/threads.zJpool_grow is not supported with autoscale. Adjust autoscale range instead.zpool will grow)r   r   
autoscalerr(   r   grow_update_prefetch_countr!   r   r  r+   r"   r"   r%   	pool_grow  s
   
r  c                 K   s6   | j jjr	tdS | j j| | j |  tdS )z#Shrink pool by n processes/threads.zLpool_shrink is not supported with autoscale. Adjust autoscale range instead.zpool will shrink)r   r   r	  r(   r   shrinkr  r!   r  r"   r"   r%   pool_shrink  s
   
r  c                 K   s.   | j jjr| jjj|||d tdS td)zRestart execution pool.)reloaderzreload startedzPool restarts not enabled)rG   rO   worker_pool_restartsr   r   reloadr!   r   )r   modulesr  r  r+   r"   r"   r%   pool_restart-  s   
r  maxminz[max [min]]c                 C   s:   | j jj}|r|||\}}td| d| S td)zModify autoscale settings.zautoscale now max=z min=zAutoscale not enabled)r   r   r	  r   r!   r   )r   r  r  r	  max_min_r"   r"   r%   	autoscale7  s
   
r  Got shutdown from remotec                 K   s   t | tt)zShutdown worker(s).)r   warningr   r   )r   msgr+   r"   r"   r%   shutdownD  s   
r  r   r   exchange_typer   z'<queue> [exchange [type [routing_key]]]c                 K   s2   | j j| j j|||pd|fi | td| S )z2Tell worker(s) to consume from task queue by name.directzadd consumer )r   	call_soonadd_task_queuer!   )r   r   r   r  r   optionsr"   r"   r%   add_consumerM  s   r#  z<queue>c                 K   s    | j | j j| td| S )z9Tell worker(s) to stop consuming from task queue by name.zno longer consuming from )r   r   cancel_task_queuer!   )r   r   _r"   r"   r%   cancel_consumer_  s   r&  c                 C   s    | j jrdd | j jjD S g S )z:List the task queues a worker is currently consuming from.c                 S   s   g | ]
}t |jd dqS )T)recurse)rl   as_dict)r[   r   r"   r"   r%   r   o  s    z!active_queues.<locals>.<listcomp>)r   task_consumerqueuesr   r"   r"   r%   active_queuesk  s
   r+  )F)FN)NNNr`   )NF)r   r   r   )r   )r   )NFN)NN)r  )Vr2   r  r   collectionsr   r   r   billiard.commonr   kombu.utils.encodingr   celery.exceptionsr   celery.platformsr   r	   rv   celery.utils.functionalr
   celery.utils.logr   celery.utils.serializationr   r   celery.utils.timer   r/   r   rz   rg   r   __all__r   r1   r   r   r!   r(   r   rC   rF   rI   rO   rQ   r_   requests__getitem__r^   r}   __contains__r   rY   rp   r   rk   rT   rm   r   floatr   r   r   r   r   r   r   r   r   r   r   re   rd   r   r   intr   r   r  r  r  r  r  r  r#  r&  r+  r"   r"   r"   r%   <module>   s.   

	




6

$





	








				
