o
    	h                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZm Z  ddl!m"Z" ddl#m$Z$m%Z% ddl&m'Z' ddl(m)Z) ddl*m+Z+ ddl,m-Z- zddl.Z.eededkZ/W n e0y   dZ.dZ/Y nw zddl.m1Z1 W n e0y   dZ1Y nw edZ2e2j3e2j4Z5Z4dZ6dZ7dZ8g dZ9edd Z:d!d" Z;d#d$ Z<G d%d& d&e=Z>e
d'd( Z?d)d* Z@G d+d, d,ZAG d-d. d.eAe.jBZCG d/d0 d0eAe.jDjEZFG d1d2 d2e.jDjGZHG d3d4 d4e-jIZIG d5d6 d6ZJG d7d8 d8e-jKZKG d9d: d:e-jLZLe1rJG d;d< d<e1jMe.jNZOG d=d> d>eKZPG d?d@ d@eLZQdS )Aa  Redis transport module for Kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connection string has the following format:

.. code-block::

    redis://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
    rediss://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]

To use sentinel for dynamic Redis discovery,
the connection string has following format:

.. code-block::

    sentinel://[USER:PASSWORD@]SENTINEL_ADDRESS[:PORT]

Transport Options
=================
* ``sep``
* ``ack_emulation``: (bool) If set to True transport will
  simulate Acknowledge of AMQP protocol.
* ``unacked_key``
* ``unacked_index_key``
* ``unacked_mutex_key``
* ``unacked_mutex_expire``
* ``visibility_timeout``
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
  used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
* ``socket_keepalive_options``
* ``queue_order_strategy``
* ``max_connections``
* ``health_check_interval``
* ``retry_on_timeout``
* ``priority_steps``
    )annotationsN)bisect)
namedtuple)contextmanager)version)Empty)time)Version)promise)InconsistencyErrorVersionMismatch)
get_logger)register_after_fork)bytes_to_str)ERRREADpoll)accepts_argument)dumpsloads)cached_property)cycle_by_name)
_parse_url   )virtualredisz5.3.0)sentinelzkombu.transport.redisi     )r         	   error_classes_t)connection_errorschannel_errorsc               
   C  sb   ddl m}  t| dr| j}n| j}ttjjt	t
jtt| j| j| j| jf tjj|| j| jf S )z$Return tuple of redis error classes.r   
exceptionsInvalidData)r   r%   hasattrr&   	DataErrorr!   r   	Transportr"   r   socketerrorIOErrorOSErrorConnectionErrorBusyLoadingErrorAuthenticationErrorTimeoutErrorr#   InvalidResponseResponseError)r%   r(    r4   W/var/www/html/optinet_system/venv/lib/python3.10/site-packages/kombu/transport/redis.pyget_redis_error_classes}   s*   
	r6   c                  C  s   ddl m}  | jS )z1Return the redis ConnectionError exception class.r   r$   )r   r%   r.   r$   r4   r4   r5   get_redis_ConnectionError   s   r7   c                   @     e Zd ZdZdS )	MutexHeldz)Raised when another party holds the lock.N__name__
__module____qualname____doc__r4   r4   r4   r5   r9      s    r9   c                 c  s    | j ||d}d}z(|jdd}|rdV  nt W |r1z|  W dS  tjjy0   Y dS w dS |rGz|  W w  tjjyF   Y w w w )zTAcquire redis lock in non blocking way.

    Raise MutexHeld if not successful.
    timeoutF)blockingN)lockacquirer9   releaser   r%   LockNotOwnedError)clientnameexpirerB   lock_acquiredr4   r4   r5   Mutex   s,   rJ   c                 C     |    d S N)_after_forkchannelr4   r4   r5   _after_fork_cleanup_channel      rP   c                      sl   e Zd ZdZg dZddddddddddddd	Zd
d Z fddZ fddZdddZ	  Z
S )GlobalKeyPrefixMixina  Mixin to provide common logic for global key prefixing.

    Overriding all the methods used by Kombu with the same key prefixing logic
    would be cumbersome and inefficient. Hence, we override the command
    execution logic that is called by all commands.
    )HDELHGETHLENHSETLLENLPUSHPUBLISHRPUSHRPOPSADDSREMSETSMEMBERSZADDZREMZREVRANGEBYSCOREr   N)
args_startargs_end   r   )DELBRPOPEVALSHAWATCHc                   s   t |}|d}| jv r jt|d  |d< n<| jv rV j| d } j| d }|dkr7|d | ng }g }|d urE||d  }| fdd||| D  | }|g|S )Nr   rc   rd   c                      g | ]	} j t| qS r4   global_keyprefixstr.0argselfr4   r5   
<listcomp>       z5GlobalKeyPrefixMixin._prefix_args.<locals>.<listcomp>)listpopPREFIXED_SIMPLE_COMMANDSrm   rn   PREFIXED_COMPLEX_COMMANDS)rs   argscommandrc   rd   pre_args	post_argsr4   rr   r5   _prefix_args   s"   




z!GlobalKeyPrefixMixin._prefix_argsc                   sH   t  j||fi |}|dkr"|r"|\}}|t| jd }||fS |S )zParse a response from the Redis server.

        Method wraps ``redis.parse_response()`` to remove prefixes of keys
        returned by redis command.
        rh   N)superparse_responselenrm   )rs   
connectioncommand_nameoptionsretkeyvalue	__class__r4   r5   r      s   z#GlobalKeyPrefixMixin.parse_responsec                      t  j| |i |S rL   r   execute_commandr~   rs   rz   kwargsr   r4   r5   r        z$GlobalKeyPrefixMixin.execute_commandTc                 C  s   t | j| j||| jdS )Nrm   )PrefixedRedisPipelineconnection_poolresponse_callbacksrm   )rs   transaction
shard_hintr4   r4   r5   pipeline  s   zGlobalKeyPrefixMixin.pipeline)TN)r;   r<   r=   r>   rx   ry   r~   r   r   r   __classcell__r4   r4   r   r5   rR      s    rR   c                   @  s    e Zd ZdZdd Zdd ZdS )PrefixedStrictRedisz@Returns a ``StrictRedis`` client that prefixes the keys it uses.c                 O  s,   | dd| _tjj| g|R i | d S Nrm    )rw   rm   r   Redis__init__r   r4   r4   r5   r     s   zPrefixedStrictRedis.__init__c                 K  s   t | jfd| ji|S )Nrm   )PrefixedRedisPubSubr   rm   )rs   r   r4   r4   r5   pubsub  s   zPrefixedStrictRedis.pubsubN)r;   r<   r=   r>   r   r   r4   r4   r4   r5   r     s    r   c                   @  s   e Zd ZdZdd ZdS )r   a   Custom Redis pipeline that takes global_keyprefix into consideration.

    As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix
    the keys it uses, the pipeline called by the client must be able to prefix
    the keys as well.
    c                 O  s.   | dd| _tjjj| g|R i | d S r   )rw   rm   r   rF   Pipeliner   r   r4   r4   r5   r   &  s    zPrefixedRedisPipeline.__init__N)r;   r<   r=   r>   r   r4   r4   r4   r5   r     s    r   c                      sD   e Zd ZdZdZ fddZdd Z fddZ fd	d
Z  Z	S )r   zCRedis pubsub client that takes global_keyprefix into consideration.)	SUBSCRIBEUNSUBSCRIBE
PSUBSCRIBEPUNSUBSCRIBEc                   s$   | dd| _t j|i | d S r   )rw   rm   r   r   r   r   r4   r5   r   5  s   zPrefixedRedisPubSub.__init__c                   s8   t |}|d}| jv r fdd|D }|g|S )Nr   c                   rk   r4   rl   ro   rr   r4   r5   rt   >  ru   z4PrefixedRedisPubSub._prefix_args.<locals>.<listcomp>)rv   rw   PUBSUB_COMMANDS)rs   rz   r{   r4   rr   r5   r~   9  s   



z PrefixedRedisPubSub._prefix_argsc                   sF   t  j|i |}|du r|S |^}}}|g fdd|D |S )zParse a response from the Redis server.

        Method wraps ``PubSub.parse_response()`` to remove prefixes of keys
        returned by redis command.
        Nc                   s   g | ]}|t  jd  qS rL   )r   rm   )rp   rO   rr   r4   r5   rt   W  s    z6PrefixedRedisPubSub.parse_response.<locals>.<listcomp>)r   r   )rs   rz   r   r   message_typechannelsmessager   rr   r5   r   E  s   z"PrefixedRedisPubSub.parse_responsec                   r   rL   r   r   r   r4   r5   r   [  r   z#PrefixedRedisPubSub.execute_command)
r;   r<   r=   r>   r   r   r~   r   r   r   r4   r4   r   r5   r   +  s    r   c                      s   e Zd ZdZdZ fddZ fddZd#dd	Z fd
dZd$ fdd	Z	e
d%ddZd#ddZd&ddZd'ddZedd Zedd Zedd Zedd  Zed!d" Z  ZS )(QoSzRedis Ack Emulation.Tc                   s   t  j|i | d| _d S )Nr   )r   r   _vrestore_countr   r   r4   r5   r   d  s   
zQoS.__init__c              	     s   |j }|d |d }}tjd dkr|t ig}nt |g}|  (}|j| jg|R  | j|t	|j
||g  t || W d    d S 1 sNw   Y  d S )Nexchangerouting_keyr   r   )delivery_infor   VERSIONr   pipe_or_acquirezaddunacked_index_keyhsetunacked_keyr   _rawexecuter   append)rs   r   delivery_tagdeliveryEXRK	zadd_argspiper   r4   r5   r   h  s   

"z
QoS.appendNc                 C  sT   | j |}| jD ]	}| j||d q
W d    n1 sw   Y  | j  d S )NrF   )rO   conn_or_acquire
_deliveredrestore_by_tagclear)rs   rF   tagr4   r4   r5   restore_unackedy  s   
zQoS.restore_unackedc                   s   |  |  t | d S rL   )_remove_from_indicesr   r   ack)rs   r   r   r4   r5   r     s   zQoS.ackFc                   s2   |r
| j |dd n| |  t | d S NT)leftmost)r   r   r   r   r   )rs   r   requeuer   r4   r5   reject  s   z
QoS.rejectc                 c  sL    |r|V  d S | j |}| V  W d    d S 1 sw   Y  d S rL   )rO   r   r   )rs   r   rF   r4   r4   r5   r     s   
"zQoS.pipe_or_acquirec                 C  sF   |  |}|| j|| j|W  d    S 1 sw   Y  d S rL   )r   zremr   hdelr   )rs   r   r   r4   r4   r5   r     s   
$zQoS._remove_from_indicesr   
   c           	   
   C  s   |  j d7  _ | j d | rd S | j X}t | j }z7t|| j| j% |j| j	|d|o/||dd}|p7g D ]
\}}| 
|| q8W d    n1 sMw   Y  W n	 ty\   Y n	w W d    d S W d    d S 1 spw   Y  d S )Nr   r   T)startnum
withscores)r   rO   r   r   visibility_timeoutrJ   unacked_mutex_keyunacked_mutex_expirezrevrangebyscorer   r   r9   )	rs   r   r   intervalrF   ceilvisibler   scorer4   r4   r5   restore_visible  s2   
"zQoS.restore_visiblec                   sP    fdd}j |}||j W d    d S 1 s!w   Y  d S )Nc                   sT   |  j}|   |  |r(tt|\}}}j||||   d S d S rL   )hgetr   multir   r   r   rO   _do_restore_message)r   pMr   r   r   rs   r   r4   r5   restore_transaction  s   z/QoS.restore_by_tag.<locals>.restore_transaction)rO   r   r   r   )rs   r   rF   r   r   r4   r   r5   r     s   "zQoS.restore_by_tagc                 C     | j jS rL   )rO   r   rr   r4   r4   r5   r        zQoS.unacked_keyc                 C  r   rL   )rO   r   rr   r4   r4   r5   r     r   zQoS.unacked_index_keyc                 C  r   rL   )rO   r   rr   r4   r4   r5   r     r   zQoS.unacked_mutex_keyc                 C  r   rL   )rO   r   rr   r4   r4   r5   r     r   zQoS.unacked_mutex_expirec                 C  r   rL   )rO   r   rr   r4   r4   r5   r     r   zQoS.visibility_timeoutrL   FNN)r   r   r   )NF)r;   r<   r=   r>   restore_at_shutdownr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r4   r4   r   r5   r   _  s.    







r   c                   @  s   e Zd ZdZeeB ZdZdZdd Z	dd Z
dd	 Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd(d$d%Zed&d' ZdS ))MultiChannelPollerz%Async I/O poller for Redis transport.FNc                 C  s(   t  | _i | _i | _t | _t  | _d S rL   )set	_channels_fd_to_chan_chan_to_sockr   poller
after_readrr   r4   r4   r5   r     s
   zMultiChannelPoller.__init__c              
   C  sX   | j  D ]}z| j| W q ttfy   Y qw | j  | j  | j   d S rL   )	r   valuesr   
unregisterKeyError
ValueErrorr   r   r   )rs   fdr4   r4   r5   close  s   

zMultiChannelPoller.closec                 C     | j | d S rL   )r   addrs   rO   r4   r4   r5   r        zMultiChannelPoller.addc                 C  r   rL   )r   discardr   r4   r4   r5   r     r   zMultiChannelPoller.discardc              	   C  s.   z
| j |j W d S  ttfy   Y d S w rL   )r   r   _sockAttributeError	TypeErrorrs   r   r4   r4   r5   _on_connection_disconnect  s
   z,MultiChannelPoller._on_connection_disconnectc                 C  sr   |||f| j v r| ||| |jjd u r|j  |jj}||f| j| < || j |||f< | j|| j	 d S rL   )
r   _unregisterr   r   connectr   filenor   register
eventflags)rs   rO   rF   typesockr4   r4   r5   	_register  s   
zMultiChannelPoller._registerc                 C  s   | j | j|||f  d S rL   )r   r   r   )rs   rO   rF   r   r4   r4   r5   r     s   zMultiChannelPoller._unregisterc                 C  sL   t |dd d u rtr|j |_n|jd|_|jjd uo%|||f| jv S )Nr   _)getattr"_REDIS_GET_CONNECTION_WITHOUT_ARGSr   get_connectionr   r   r   )rs   rO   rF   cmdr4   r4   r5   _client_registered  s   z%MultiChannelPoller._client_registeredc                 C  sB   ||j df}| ||j dsd|_| j|  |js|  dS dS )zEnable BRPOP mode for channel.rh   FN)rF   r  _in_pollr  _brpop_start)rs   rO   identr4   r4   r5   _register_BRPOP  s   
z"MultiChannelPoller._register_BRPOPc                 C  s<   |  ||jdsd|_| ||jd |js|  dS dS )zEnable LISTEN mode for channel.LISTENFN)r  	subclient
_in_listenr  
_subscriber   r4   r4   r5   _register_LISTEN  s   z#MultiChannelPoller._register_LISTENc                 C  s:   | j D ]}|jr|j r| | |jr| | qd S rL   )r   active_queuesqoscan_consumer  active_fanout_queuesr  r   r4   r4   r5   on_poll_start  s   



z MultiChannelPoller.on_poll_startc                 C  s(   || _ | jD ]}|jj|jd  S d S N)r   )r   r   r  r   unacked_restore_limit)rs   r   rO   r4   r4   r5   on_poll_init&  s   

zMultiChannelPoller.on_poll_initc                 C  s*   | j D ]}|jr|jj|jd  S qd S r  )r   r  r  r   r  r   r4   r4   r5   maybe_restore_messages-  s   

z)MultiChannelPoller.maybe_restore_messagesc                 C  s<   | j D ]}|jd}|d urtt|dd r|  qd S )Nr  check_health)r   __dict__getcallabler  r  )rs   rO   rF   r4   r4   r5   maybe_check_subclient_health5  s   
z/MultiChannelPoller.maybe_check_subclient_healthc                 C  s,   | j | \}}|j r|j|   d S d S rL   )r   r  r  handlers)rs   r   chanr   r4   r4   r5   on_readable=  s   
zMultiChannelPoller.on_readablec                 C  s>   |t @ r| || fS |t@ r| j| \}}|| d S d S rL   )r   r"  r   r   _poll_error)rs   r   eventr!  r   r4   r4   r5   handle_eventB  s   zMultiChannelPoller.handle_eventc           	      C  s   d| _ z]| jD ]}|jr|j r| | |jr| | q| j	|}|rZ|D ]0\}}| 
||}|rY W d| _ | jrWz| j }W n
 tyN   Y d S w |  | js=d S d S q)|   t d| _ | jr~z| j }W n	 tyw   Y w w |  | jsgw )NTF)_in_protected_readr   r  r  r  r  r  r  r   r   r%  r   rw   r   r  r   )	rs   callbackr@   rO   eventsr   r$  r   funr4   r4   r5   r  I  sJ   



zMultiChannelPoller.getc                 C  s   | j S rL   )r   rr   r4   r4   r5   fdsg  s   zMultiChannelPoller.fdsrL   )r;   r<   r=   r>   r   r   r   r&  r   r   r   r   r   r   r  r   r  r  r  r  r  r  r  r"  r%  r  propertyr*  r4   r4   r4   r5   r     s0    

		
r   c                      s  e Zd ZdZeZdZdZdZdZdZ	dZ
dZdZdZi ZdZdZd	Zd
ZdZdZdZeZdZdZdZdZdZdZeZdZ dZ!dZ"dZ#dZ$dZ%e&j'j(d Z(e)rQe)j*ndZ+e)rXe)j,ndZ- fddZ.dd Z/dd Z0dd Z1	drddZ2dr fdd	Z3dd Z4 fdd Z5d!d" Z6 fd#d$Z7d%d& Z8d'd( Z9d)d* Z:d+d, Z;d-d. Z<d/d0 Z=d1d2 Z>dsd4d5Z?d6d7 Z@d8d9 ZAd:d; ZBd<d= ZCd>d? ZDd@dA ZEdBdC ZFdDdE ZGdrdFdGZHdHdI ZIdJdK ZJdLdM ZKdNdO ZLdPdQ ZM fdRdSZNdTdU ZOdVdW ZP		dtdXdYZQdrdZd[ZRdrd\d]ZSdrd^d_ZTd`da ZUeVdudbdcZWeXddde ZYeXdfdg ZZe[dhdi Z\e[djdk Z]dldm Z^dndo Z_eXdpdq Z`  ZaS )vChannelzRedis Channel.NFTz_kombu.binding.%sz/{db}.zunackedunacked_indexunacked_mutexi,  i  r   r   round_robin)sepack_emulationr   r   r   r   r   r  fanout_prefixfanout_patternsrm   socket_timeoutsocket_connect_timeoutsocket_keepalivesocket_keepalive_optionsqueue_order_strategymax_connectionshealth_check_intervalretry_on_timeoutpriority_stepsc                   s   t  j|i | | jstj| _d| _t| j | _| 	 | _
|  | _t | _t | _i | _| j| jd| _| jrEt| jtrD| j| _nd| _z| j  W n ty[   |    w | jj|  d| _| jj| _t d urvt | t! d S d S )NF)rh   r  r   T)"r   r   r2  r   r   _registeredr   r9  _queue_cycle_get_clientClient_get_response_errorr3   r   r  auto_delete_queues_fanout_to_queue_brpop_read_receiver   r3  
isinstancern   keyprefix_fanoutrF   ping	Exception_disconnect_poolsr   cycler   r"   r   rP   r   r   r4   r5   r     s8   


zChannel.__init__c                 C  rK   rL   )rK  rr   r4   r4   r5   rM     rQ   zChannel._after_forkc                 C  s@   | j }| j}d  | _| _ |d ur|  |d ur|  d S d S rL   )_pool_async_pool
disconnect)rs   pool
async_poolr4   r4   r5   rK    s   zChannel._disconnect_poolsc                 C  sH   | j |u rd | _ | j|u rd | _| jr | jjr"| jj| d S d S d S rL   )r	  r  r   rL  r   r   r4   r4   r5   r     s   

z!Channel._on_connection_disconnectc                 C  s   z>zd|d d< d|d d d< W n	 t y   Y nw | ||D ]}| j|dd}|r/|jn|j| ||t| q!W d S  tyO   td|dd	 Y d S w )
NTheadersredelivered
propertiesr   FreversezCould not restore message: %rexc_info)	r   _lookup_get_message_prioritylpushrpush
_q_for_prir   rJ  crit)rs   payloadr   r   r   r   queueprir4   r4   r5   r     s    zChannel._do_restore_messagec                   sd   j s	t |S |j fdd} }||j W d    d S 1 s+w   Y  d S )Nc                   sT   |  j}|   | j |r(tt|\}}}||||   d S d S rL   )r   r   r   r   r   r   r   )r   Pr   r   r   r   r4   r5   r   ,  s   z-Channel._restore.<locals>.restore_transaction)r2  r   _restorer   r   r   r   )rs   r   r   r   rF   r   r   r5   rc  '  s   
"zChannel._restorec                 C  s   | j |ddS r   )rc  )rs   r   r4   r4   r5   _restore_at_beginning7  s   zChannel._restore_at_beginningc                   sT   || j v r| j | \}}| j| || j|< t j|g|R i |}|   |S rL   )_fanout_queuesr  r   rD  r   basic_consume_update_queue_cycle)rs   r`  rz   r   r   r  r   r   r4   r5   rf  :  s   

zChannel.basic_consumec                 C  s8   | j }|r|jjr|jjt| j|fS | |S d S rL   )r   rL  r&  r   r   r
   _basic_cancel)rs   consumer_tagr   r4   r4   r5   basic_cancelN  s   
zChannel.basic_cancelc                   s   z| j | }W n
 ty   Y d S w z| j| W n	 ty#   Y nw | | z| j| \}}| j| W n	 tyA   Y nw t 	|}| 
  |S rL   )_tag_to_queuer   r  remove_unsubscribe_fromre  rD  rw   r   rj  rg  )rs   ri  r`  r   r  r   r   r4   r5   rh  [  s(   
zChannel._basic_cancelc                 C  s.   |r| j rd| j|d|gS d| j|gS )Nr   /)r4  joinrH  )rs   r   r   r4   r4   r5   _get_publish_topico  s   
zChannel._get_publish_topicc                 C  s   | j | \}}| ||S rL   )re  rp  )rs   r`  r   r   r4   r4   r5   _get_subscribe_topict  s   zChannel._get_subscribe_topicc                   sN    fdd j D }|sd S  j}|jjd u r|j  |j _|| d S )Nc                   s   g | ]}  |qS r4   )rq  rp   r`  rr   r4   r5   rt   y  s    z&Channel._subscribe.<locals>.<listcomp>)r  r  r   r   r   r  
psubscribe)rs   keyscr4   rr   r5   r  x  s   

zChannel._subscribec                 C  s6   |  |}| j}|jr|jjr||g d S d S d S rL   )rq  r  r   r   unsubscribe)rs   r`  topicru  r4   r4   r5   rm    s
   
zChannel._unsubscribe_fromc                 C  s   t |d dkr|d dkrd|_d S t |d dkr.|d |d |d |d f\}}}}n|d d |d |d f\}}}}||||dS )	Nr   rv  rf   Fpmessager   r   )r   patternrO   data)r   
subscribed)rs   rF   rr   ry  rO   rz  r4   r4   r5   _handle_message  s   & zChannel._handle_messagec                 C  sz   | j }g }z
|| | W n	 ty   Y nw |jd ur9|jjddr9|| | |jd ur9|jjdds%t|S )Nr   r?   )r  r   _receive_oner   r   can_readany)rs   ru  r   r4   r4   r5   rF    s   zChannel._receivec              	   C  s  d }z|  }W n | jy   d | _ w t|ttfr|| ||}t|d dr~t|d }|d r|d dkrC|	d\}}}z
t
t|d }W n ttfyg   td|t|d d	 d
d t w |dd
d }| j|| j|  dS d S d S d S )Nr   r   rO   rz  r   rn  .z&Cannot process event on channel %r: %si   r   rW  T)r   r"   r  rG  rv   tupler}  r   endswith	partitionr   r   r   warningreprr   splitr   _deliverrD  )rs   ru  responser_  rO   r  r   r   r4   r4   r5   r~    s<   
zChannel._receive_oner   c                   sr   j tj  sd S  fddjD |pdg }jj_dg|}jr0j	|}jjj
|  d S )Nc                   s"   g | ]} D ]} ||qqS r4   )r]  )rp   ra  r`  queuesrs   r4   r5   rt     s
    z(Channel._brpop_start.<locals>.<listcomp>r   rh   )r?  consumer   r  r=  rF   r   r	  rm   r~   send_command)rs   r@   rt  command_argsr4   r  r5   r
    s   

zChannel._brpop_startc                 K  s   zJz| j j| j jdfi |}W n | jy   | j j   w |rH|\}}t|| jdd }| j	| | j
tt|| W d | _dS t d | _w )Nrh   r   r   T)rF   r   r   r"   rO  r   rsplitr1  r?  rotater  r   r	  r   )rs   r   
dest__itemdestitemr4   r4   r5   rE    s(   

zChannel._brpop_readc                 K  s,   |dkr| j   d S | j| jj| d S )Nr  )r  r   rF   r   )rs   r   r   r4   r4   r5   r#    s   zChannel._poll_errorc                 C  sd   |   $}| jD ]}|| ||}|r$tt|  W  d    S qt 1 s+w   Y  d S rL   )r   r=  rpopr]  r   r   r   )rs   r`  rF   ra  r  r4   r4   r5   _get  s   

zChannel._getc              	   C  s   |   @}| +}| jD ]}|| ||}q| }tdd |D W  d    W  d    S 1 s7w   Y  W d    d S 1 sGw   Y  d S )Nc                 s  s     | ]}t |tjr|V  qd S rL   )rG  numbersIntegral)rp   sizer4   r4   r5   	<genexpr>  s    
z Channel._size.<locals>.<genexpr>)r   r   r=  llenr]  r   sum)rs   r`  rF   r   ra  sizesr4   r4   r5   _size  s   


"zChannel._sizec                 C  s$   |  |}|r| | j | S |S rL   )priorityr1  )rs   r`  ra  r4   r4   r5   r]    s   
zChannel._q_for_pric                 C  s   | j }|t||d  S )Nr   )r=  r   )rs   nstepsr4   r4   r5   r    s   zChannel.priorityc                 K  sT   | j |dd}|  }|| ||t| W d   dS 1 s#w   Y  dS )zDeliver message.FrU  N)rZ  r   r[  r]  r   )rs   r`  r   r   ra  rF   r4   r4   r5   _put  s   
"zChannel._putc                 K  sF   |   }|| ||t| W d   dS 1 sw   Y  dS )zDeliver fanout message.N)r   publishrp  r   )rs   r   r   r   r   rF   r4   r4   r5   _put_fanout	  s   

"zChannel._put_fanoutc                 K  s   |r
| j | d S d S rL   )rC  r   )rs   r`  auto_deleter   r4   r4   r5   
_new_queue  s   zChannel._new_queuec              	   C  s   |  |jdkr||ddf| j|< |   }|| j|f | j|p%d|p(d|p+dg W d    d S 1 s:w   Y  d S )Nfanout#*r   )	typeofr   replacere  r   saddkeyprefix_queuer1  ro  )rs   r   r   ry  r`  rF   r4   r4   r5   _queue_bind  s   

"zChannel._queue_bindc           
   	   O  s   | j | | j|ddO}|| j|f | j|pd|p d|p#dg | }| j	D ]}	|
| ||	}q/|  W d    n1 sIw   Y  W d    d S W d    d S 1 saw   Y  d S )NrF   r   r   )rC  r   r   r  sremr  r1  ro  r   r=  deleter]  r   )
rs   r`  r   r   ry  rz   r   rF   r   ra  r4   r4   r5   _delete!  s    


"zChannel._deletec              	   K  s   |   9}| $}| jD ]}|| ||}qt| W  d    W  d    S 1 s0w   Y  W d    d S 1 s@w   Y  d S rL   )r   r   r=  existsr]  r  r   )rs   r`  r   rF   r   ra  r4   r4   r5   
_has_queue-  s   



"zChannel._has_queuec                   sh    j | }  !}||}|sg W  d    S  fdd|D W  d    S 1 s-w   Y  d S )Nc                   s    g | ]}t t| jqS r4   )r  r   r  r1  )rp   valrr   r4   r5   rt   <  s     z%Channel.get_table.<locals>.<listcomp>)r  r   smembers)rs   r   r   rF   r   r4   rr   r5   	get_table4  s   


$zChannel.get_tablec              	   C  s   |   E}| 0}| jD ]}| ||}|||}q| }t|d d d W  d    W  d    S 1 s<w   Y  W d    d S 1 sLw   Y  d S )Nrf   )r   r   r=  r]  r  r  r   r  )rs   r`  rF   r   ra  priqr  r4   r4   r5   _purge>  s   


"zChannel._purgec                   s   d| _ | jrz|   W n	 ty   Y nw | jsD| jj|  | j	d}|d ur<| j
D ]}|| jv r;| j||d q-|   |   t   d S )NTrF   r   )_closingr	  rE  r   closedr   rL  r   r  r  re  rC  queue_deleterK  _close_clientsr   r   )rs   rF   r`  r   r4   r5   r   G  s$   

zChannel.closec                 C  sL   dD ]!}z| j | }|jd }|_|  W q tt| jfy#   Y qw d S )N)rF   r  )r  r   rO  r   r   r3   )rs   attrrF   r   r4   r4   r5   r  \  s   
zChannel._close_clientsc                 C  sd   t |tjs0|r|dkrt}n|dr|dd  }zt|}W |S  ty/   td|w |S )Nrn  r   z/Database is int between 0 and limit - 1, not {})rG  r  r  
DEFAULT_DB
startswithintr   format)rs   vhostr4   r4   r5   _prepare_virtual_hostf  s    

zChannel._prepare_virtual_hostc                 K  s   |S rL   r4   )rs   r7  r8  paramsr4   r4   r5   _filter_tcp_connparamsu  s   zChannel._filter_tcp_connparamsc                   s  | j j}|jpd|jp| j j|j|j|j| j| j	| j
| j| j| j| jd}| j}t|drN|g}t|dr<|t|j7 }|D ]
}t|jdrH nq>|d |jrhz||j | j|d< W n	 tyg   Y nw |d }d|v rt|\}}	}	}
}}}|d	kr| jdi |}|jtjd
| dfi | |dd  |dd  |dd  |
|d< ||d< |dd  |dd  | |dd |d< |  |dp| j}|rG  fddd|}|}||d< |S )Nz	127.0.0.1)hostportvirtual_hostusernamepasswordr:  r5  r6  r7  r8  r;  r<  r   	__bases__r;  connection_classr  z://r*   rn  )r  pathr6  r7  r8  r  r  r  r  dbc                      s   e Zd Z fddZ  ZS )z'Channel._connparams.<locals>.Connectionc                   s$   t  j|  jr|  d S d S rL   )r   rO  r>  r   )rs   rz   )r   rO   r4   r5   rO    s   z2Channel._connparams.<locals>.Connection.disconnect)r;   r<   r=   rO  r   r4   rN   r   r5   
Connection  s    r  r4   ) r   rF   hostnamer  default_portr  useridr  r:  r5  r6  r7  r8  r;  r<  r  r'   rv   r  r   r   rw   sslupdateconnection_class_sslr   r   r  r   UnixDomainSocketConnectionr  r  )rs   asynchronousconninfo
connparams
conn_classclassesklassr  schemer  r  r  r  queryconnection_clsr  r4   rN   r5   _connparamsy  sz   




zChannel._connparamsc                 C  s    |r	| j | jdS | j | jdS )N)r   )rA  rQ  rP  )rs   r  r4   r4   r5   _create_client  s   zChannel._create_clientc                 C  s0   | j |d}| jj|d d| _tjdi |S )Nr  r  r  r4   )r  rH  r  r   ConnectionPoolrs   r  r  r4   r4   r5   	_get_pool  s   zChannel._get_poolc                 C  s4   t jdk rtdt | jrtjt| jdS t jS )N)r   rf   r   zSRedis transport requires redis-py versions 3.2.0 or later. You have {0.__version__}r   )	r   r   r   r  rm   	functoolspartialr   r   rr   r4   r4   r5   r@    s   
zChannel._get_clientc                 c  s    |r|V  d S |   V  d S rL   r  rs   rF   r4   r4   r5   r     s   
zChannel.conn_or_acquirec                 C  s   | j d u r
|  | _ | j S rL   )rM  r  rr   r4   r4   r5   rP    s   

zChannel.poolc                 C  s   | j d u r| jdd| _ | j S )NTr  )rN  r  rr   r4   r4   r5   rQ    s   
zChannel.async_poolc                 C  s   | j ddS )z+Client used to publish messages, BRPOP etc.Tr  r  rr   r4   r4   r5   rF     s   zChannel.clientc                 C  s   | j dd}| S )z1Pub/Sub connection used to consume fanout queues.Tr  )r  r   r  r4   r4   r5   r    s   zChannel.subclientc                 C  s   | j | j d S rL   )r?  r  r  rr   r4   r4   r5   rg     s   zChannel._update_queue_cyclec                 C  s   ddl m} |jS )Nr   r$   )r   r%   r3   )rs   r%   r4   r4   r5   rB    s   zChannel._get_response_errorc                   s    fdd j D S )z<Set of queues being consumed from (excluding fanout queues).c                   s   h | ]	}| j vr|qS r4   )r  rr  rr   r4   r5   	<setcomp>
  s    
z(Channel.active_queues.<locals>.<setcomp>)_active_queuesrr   r4   rr   r5   r    s   zChannel.active_queuesr   )r   r   rL   )br;   r<   r=   r>   r   _client
_subclientr  supports_fanoutr  rH  r1  r	  r  re  r2  r   r   r   r   r  r   PRIORITY_STEPSr=  r5  r6  r7  r8  r<  r:  DEFAULT_HEALTH_CHECK_INTERVALr;  r3  r4  rm   r9  rN  rM  r   r,  from_transport_optionsr   r  r  SSLConnectionr  r   rM   rK  r   r   rc  rd  rf  rj  rh  rp  rq  r  rm  r}  rF  r~  r
  rE  r#  r  r  r]  r  r  r  r  r  r  r  r  r  r   r  r  r  r  r  r  r@  r   r   r+  rP  rQ  r   rF   r  rg  rB  r  r   r4   r4   r   r5   r,  l  s    (	

	

	



Q




r,  c                      st   e Zd ZdZeZdZeZdZdZ	e
jjjdeg ddZer$e \ZZ fddZd	d
 Zdd Zdd Z  ZS )r)   zRedis Transport.Nr   T)directrw  r  )r  exchange_typec                   s.   t d u rtdt j|i | t | _d S )Nz)Missing redis library (pip install redis))r   ImportErrorr   r   r   rL  r   r   r4   r5   r      s   zTransport.__init__c                 C  s   t jS rL   )r   __version__rr   r4   r4   r5   driver_version(  s   zTransport.driver_versionc                   s   | j j jj | jfdd}|_ fddj 	dj
 |jjdt}	|j d S )Nc                   sD   | j r	| j   jr z	j W d S  ty   Y d S w d S rL   )r   rl  r*  on_tickr   )r   )rL  loopr  r4   r5   _on_disconnect2  s   z:Transport.register_with_event_loop.<locals>._on_disconnectc                     s       fddj D  d S )Nc                   s   g | ]} ||qS r4   r4   )rp   r   )
add_readerr"  r4   r5   rt   A  s    zMTransport.register_with_event_loop.<locals>.on_poll_start.<locals>.<listcomp>)r*  r4   )r  rL  cycle_poll_startr"  r4   r5   r  ?  s   z9Transport.register_with_event_loop.<locals>.on_poll_startr   r;  )rL  r  r   r  r  r"  r   r  r   call_repeatedlyr  rF   transport_optionsr  r  r  )rs   r   r  r  r;  r4   )r  rL  r  r  r  r"  r5   register_with_event_loop+  s$   z"Transport.register_with_event_loopc                 C  s   | j | dS )z1Handle AIO event for one of our file descriptors.N)rL  r"  )rs   r   r4   r4   r5   r"  M  s   zTransport.on_readable)r;   r<   r=   r>   r,  polling_intervalDEFAULT_PORTr  driver_typedriver_namer   r)   
implementsextend	frozensetr   r6   r"   r#   r   r  r  r"  r   r4   r4   r   r5   r)     s"    

"r)   c                   @  r8   )SentinelManagedSSLConnectionzConnect to a Redis server using Sentinel + TLS.

        Use Sentinel to identify which Redis server is the current master
        to connect to and when connecting to the Master server, use an
        SSL Connection.
        Nr:   r4   r4   r4   r5   r  S  s    r  c                   @  sH   e Zd ZdZejd ZerejndZere	ndZ
d	ddZd	ddZdS )
SentinelChannela  Channel with explicit Redis Sentinel knowledge.

    Broker url is supposed to look like:

    .. code-block::

        sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

    where each sentinel is separated by a `;`.

    Other arguments for the sentinel should come from the transport options
    (see `transport_options` of :class:`~kombu.connection.Connection`).

    You must provide at least one option in Transport options:
     * `master_name` - name of the redis group to poll

    Example:
    -------
    .. code-block:: python

        >>> import kombu
        >>> c = kombu.Connection(
             'sentinel://sentinel1:26379;sentinel://sentinel2:26379',
             transport_options={'master_name': 'mymaster'}
        )
        >>> c.connect()
    )master_namemin_other_sentinelssentinel_kwargsNFc           	      C  s   |  |}| }|dd  |dd  g }| jjjD ]}t|}|jdkr6|jp-| jj	}|
|j|f q|sD|
|d |d f tj|ft| ddt| dd d|}t| dd }|d u rftd	||tjjS )
Nr  r  r   r
  r   r  )r
  r  r	  z1'master_name' transport option must be specified.)r  copyrw   r   rF   altr   r  r  r  r   r  r   Sentinelr  r   
master_forr   r   r   )	rs   r  r  additional_params	sentinelsurlr  sentinel_instr	  r4   r4   r5   _sentinel_managed_pool  s@   



z&SentinelChannel._sentinel_managed_poolc                 C  s*   | j |d}| jj|d d| _| |S )Nr  r  r  )r  rH  r  r  r  r4   r4   r5   r    s   
zSentinelChannel._get_poolr   )r;   r<   r=   r>   r,  r  r   SentinelManagedConnectionr  r  r  r  r  r4   r4   r4   r5   r  `  s    

%r  c                   @  s   e Zd ZdZdZeZdS )SentinelTransportzRedis Sentinel Transport.ig  N)r;   r<   r=   r>   r  r  r,  r4   r4   r4   r5   r    s    r  )Rr>   
__future__r   r  r  r*   r   collectionsr   
contextlibr   importlib.metadatar   r`  r   r   packaging.versionr	   viner
   kombu.exceptionsr   r   	kombu.logr   kombu.utils.compatr   kombu.utils.encodingr   kombu.utils.eventior   r   r   kombu.utils.functionalr   kombu.utils.jsonr   r   kombu.utils.objectsr   kombu.utils.schedulingr   kombu.utils.urlr   r   r   r   r  r  r   loggercriticalr  r^  r  r  r  r  r!   r6   r7   rJ  r9   rJ   rP   rR   r   r   rF   r   r   PubSubr   r   r   r,  r)   r  r  r  r  r  r4   r4   r4   r5   <module>   s    5

S4k #     'D
P