o
    	hk                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlmZm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlmZmZm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl'm*Z+ ddl,m*Z- ddl.m*Z/ ddl0m1Z2 ddl3m4Z4 ddl5m6Z6 ddl7m8Z8m9Z9 ddl:m;Z;m<Z< ddl=m>Z> ddl?m@Z@ e6dZAeBejCh d ZDeEdeEdidd  eDD ZFG d!d" d"ZGG d#d$ d$ZHejIG d%d& d&ZJG d'd( d(e@jKZKG d)d* d*e@jLZLdS )+ap  GCP Pub/Sub transport module for kombu.

More information about GCP Pub/Sub:
https://cloud.google.com/pubsub

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: No
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No

Connection String
=================

Connection string has the following formats:

.. code-block::

    gcpubsub://projects/project-name

Transport Options
=================
* ``queue_name_prefix``: (str) Prefix for queue names.
* ``ack_deadline_seconds``: (int) The maximum time after receiving a message
  and acknowledging it before pub/sub redelivers the message.
* ``expiration_seconds``: (int) Subscriptions without any subscriber
  activity or changes made to their properties are removed after this period.
  Examples of subscriber activities include open connections,
  active pulls, or successful pushes.
* ``wait_time_seconds``: (int) The maximum time to wait for new messages.
  Defaults to 10.
* ``retry_timeout_seconds``: (int) The maximum time to wait before retrying.
* ``bulk_max_messages``: (int) The maximum number of messages to pull in bulk.
  Defaults to 32.
    )annotationsN)FIRST_COMPLETEDFutureThreadPoolExecutorwait)suppress)getpid)Empty)Lock)	monotonicsleep)NAMESPACE_OIDuuid3)gethostnametimeout)AlreadyExistsDeadlineExceededPermissionDenied)Retry)monitoring_v3)query)PublisherClientSubscriberClient)
exceptions)gapic_version)TRANSIENT_DELIVERY_MODE)
get_logger)bytes_to_strsafe_str)dumpsloads)cached_property   )virtualzkombu.transport.gcpubsub>   ._-r%   r'   c                 C  s   i | ]	}t |t d qS )r&   )ord).0c r+   Z/var/www/html/optinet_system/venv/lib/python3.10/site-packages/kombu/transport/gcpubsub.py
<dictcomp>S   s    r-   c                   @  sL   e Zd ZdZdd Zdd Zddd	ZdddZdd Zdd Z	dd Z
dS )
UnackedIdszThreadsafe list of ack_ids.c                 C  s   g | _ t | _d S N)_listr
   _lockselfr+   r+   r,   __init__Z      zUnackedIds.__init__c                 C     | j | d S r/   )r0   appendr3   valr+   r+   r,   r7   ^      zUnackedIds.appendvalslistc                 C  r6   r/   )r0   extend)r3   r;   r+   r+   r,   r=   b   r:   zUnackedIds.extendc                 C  s6   | j  | j|W  d    S 1 sw   Y  d S r/   )r1   r0   pop)r3   indexr+   r+   r,   r?   f   s   
$zUnackedIds.popc              	   C  sp   | j + tt | j| W d    n1 sw   Y  W d    d S W d    d S 1 s1w   Y  d S r/   )r1   r   
ValueErrorr0   remover8   r+   r+   r,   rB   j   s   PzUnackedIds.removec                 C  s4   | j  t| jW  d    S 1 sw   Y  d S r/   )r1   lenr0   r2   r+   r+   r,   __len__n   s   $zUnackedIds.__len__c                 C  s
   | j | S r/   )r0   )r3   itemr+   r+   r,   __getitem__r      
zUnackedIds.__getitem__N)r;   r<   )r>   )__name__
__module____qualname____doc__r4   r7   r=   r?   rB   rD   rF   r+   r+   r+   r,   r.   W   s    

r.   c                   @  s6   e Zd ZdZdddZdddZddd	Zd
d ZdS )AtomicCounterzIThreadsafe counter.

    Returns the value after inc/dec operations.
    r   c                 C  s   || _ t | _d S r/   )_valuer
   r1   )r3   initialr+   r+   r,   r4   }   r5   zAtomicCounter.__init__r#   c                 C  s>   | j  |  j|7  _| jW  d    S 1 sw   Y  d S r/   r1   rM   r3   nr+   r+   r,   inc      $zAtomicCounter.incc                 C  s>   | j  |  j|8  _| jW  d    S 1 sw   Y  d S r/   rO   rP   r+   r+   r,   dec   rS   zAtomicCounter.decc                 C  s0   | j  | jW  d    S 1 sw   Y  d S r/   rO   r2   r+   r+   r,   get   s   $zAtomicCounter.getN)r   )r#   )rH   rI   rJ   rK   r4   rR   rT   rU   r+   r+   r+   r,   rL   w   s    


rL   c                   @  sF   e Zd ZU dZded< ded< ded< ded< ejedZded	< d
S )QueueDescriptorzPub/Sub queue descriptor.strname
topic_pathsubscription_idsubscription_path)default_factoryr.   unacked_idsN)	rH   rI   rJ   rK   __annotations__dataclassesfieldr.   r]   r+   r+   r+   r,   rV      s   
 rV   c                      s  e Zd ZU dZdZdZdZdZdZdZ	dZ
dZe Zd	Zd
ed< e Ze Zi Zded< e Zded<  fddZefddddZdd Z		dedfddZdgd!d"Z												dhdid%d&Zd'd( Zd)d* Zd+d, Z dedjd0d1Z!dkd4d5Z"djd6d7Z#dld8d9Z$de fd:d;	Z%dmd<d=Z&dn fd>d?	Z'dodBdCZ(dpdDdEZ)dFdG Z*dpdHdIZ+e,dJdK Z-e,dLdM Z.e,dNdO Z/e0dPdQ Z1e0dRdS Z2e,dTdU Z3e,dVdW Z4e,dXdY Z5e,dZd[ Z6e,d\d] Z7e,d^d_ Z8 fd`daZ9e:dbdc Z;  Z<S )qChannelzGCP Pub/Sub channel.TF
      iQ i,      Nzthreading.Thread_unacked_extenderzdict[str, QueueDescriptor]_queue_cachezset[str]_tmp_subscriptionsc                   sv   t  j|i | t | _td| jj t	| jj| _
| j dkr9tj| jddt_| j  tj  d S d S )Nznew GCP pub/sub channel: %sr#   T)targetdaemon)superr4   r   poolloggerinfoconninfohostname	Transport	parse_uri
project_id_n_channelsrR   	threadingThread_extend_unacked_deadlinera   re   _stop_extenderclearstart)r3   argskwargs	__class__r+   r,   r4      s   
zChannel.__init__rX   rW   returnc                 C  s(   | | js| j| }tt||S )z7Format AMQP queue name into a valid Pub/Sub queue name.)
startswithqueue_name_prefixrW   r   	translate)r3   rX   tabler+   r+   r,   entity_name   s   
zChannel.entity_namec                 C  s
  |  |j}| |}td|||| i }|dkr.dd| di}| j| j|}| j}n7|dkr]t	t
t  dt   }	| d|	 }
| j| j|
}| j| | j| d	}ntd
| d| | j||}| j||||d t||||d}|| j|< d S )Nz9binding queue: %s to %s exchange: %s with routing_key: %sdirectfilterzattributes.routing_key=""fanoutr%   r'   iX  zexchange type z not implemented)rY   r[   filter_argsmsg_retention)rX   rY   rZ   r[   )typeoftyper   rl   debug
subscriberr[   rr   expiration_secondsr   r   r   r   rg   add_fanout_exchangesNotImplementedError_create_topic_create_subscriptionrV   rf   )r3   exchangerouting_keypatternqueueexchange_typer   r[   message_retention_durationuiduniq_sub_nameexchange_topicqdescr+   r+   r,   _queue_bind   sZ   

zChannel._queue_bindrr   topic_idr   intc                 C  sx   | j ||}| |rtd| |S ztd| d|i}|r(| d|d< | j j|d W |S  ty;   Y |S w )Nztopic: %s existszcreating topic: %srX   sr   request)	publisherrY   _is_topic_existsrl   r   create_topicr   )r3   rr   r   r   rY   r   r+   r+   r,   r     s$   
zChannel._create_topicrY   boolc                 C  s8   | j jdd| j id}|D ]
}|j|kr dS qdS )Nprojectz	projects/r   TF)r   list_topicsrr   rX   )r3   rY   topicstr+   r+   r,   r     s   
zChannel._is_topic_existsr[   r   c                 C  s   |p	| j | j|}|p| j||}z+td||| |p | j}| j j||| j	d| j di| dd|p8i d W |S  t
yH   Y |S w )Nz0creating subscription: %s, topic: %s, filter: %sttlr   )rX   topicack_deadline_secondsexpiration_policyr   r   )r   r[   rr   r   rY   rl   r   r   create_subscriptionr   r   )r3   rr   r   rY   r[   r   r   r+   r+   r,   r   #  s<   


zChannel._create_subscriptionc                 O  sP   |  |}td| | j|}|sdS | jjd|jid | j|d dS )zDelete a queue by name.zdeleting queue: %sNsubscriptionr   )	r   rl   rm   rf   rU   r   delete_subscriptionr[   r?   )r3   r   rz   r{   r   r+   r+   r,   _deleteK  s   
zChannel._deletec                 K  sV   |  |}| j| }| |}td||j| t|}| jj|j|	d|d dS )zPut a message onto the queue.z8putting message to queue: %s, topic: %s, routing_key: %sutf-8)r   N)
r   rf   _get_routing_keyrl   r   rY   r    r   publishencode)r3   r   messager{   r   r   encoded_messager+   r+   r,   _putW  s   



zChannel._putc                 K  sV   |  || | j| j|}td|| t|}| jj||dt	| j
dd dS )z#Put a message onto fanout exchange.z-putting msg to fanout exchange: %s, topic: %sr   deadline)retryN)_lookupr   rY   rr   rl   r   r    r   r   r   retry_timeout_seconds)r3   r   r   r   r{   rY   r   r+   r+   r,   _put_fanouti  s   

zChannel._put_fanoutr   r   floatc           	      C  s   |  |}| j| }z| jj|jddt| jd|p| jd}W n
 ty*   t	 w t
|jdkr5t	 |jd }|j}t|jj}|d d }td|||d  | |d rjtd	| | |g|j |S |||jj|jd
|d< |j| |S )z(Retrieves a single message from a queue.r#   r   max_messagesr   r   r   r   r   
propertiesdelivery_infoz-queue:%s got message, ack_id: %s, payload: %szauto acking message ack_id: %sr   ack_id
message_idr[   gcpubsub_message)r   rf   r   pullr[   r   r   wait_time_secondsr   r	   rC   received_messagesr   r!   r   datarl   r   _is_auto_ack_do_ackr   r]   r7   )	r3   r   r   r   responser   r   payloadr   r+   r+   r,   _gety  sH   






zChannel._getpayload_propertiesdictc                 C  s&   |d d }|d }|t kp|| jv S )Nr   r   delivery_mode)r   r   )r3   r   r   r   r+   r+   r,   r     s
   zChannel._is_auto_ackc                 C  s4  |  |}| j| }|  }|st z| jj|j|dt| jd|p%| j	d}W n
 t
y3   t w |j}t|dkr@t g }g }	tdt|| |D ]7}
|
j}tt|
jj}|d d }|||
jj|jd|d	< | |d r{|| n|j| |	| qO|rtd
| | ||j ||	fS )z(Retrieves bulk of messages from a queue.r   r   r   r   z#batching %d messages from queue: %sr   r   r   r   zauto acking ack_ids: %s)r   rf   _get_max_messages_estimater	   r   r   r[   r   r   r   r   r   rC   rl   r   r   r!   r   r   r   r   r   r7   r]   r   )r3   r   r   prefixed_queuer   r   r   r   auto_ack_idsret_payloadsr   r   r   r   r+   r+   r,   	_get_bulk  sV   




zChannel._get_bulkc                 C  s    | j  }| j}|d u r|S |S r/   )qoscan_consume_max_estimatebulk_max_messages)r3   max_allowedmax_if_unlimitedr+   r+   r,   r     s   
z"Channel._get_max_messages_estimatec                   sh   | j j|i }|st |||S | || ||||}|r$|S t	d| | 
||| |gS )Nz3no queues bound to exchange: %s, binding on the fly)state	exchangesrU   rj   r   r   lookup	get_tablerl   r   
queue_bind)r3   r   r   defaultexchange_inforetr|   r+   r,   r     s"   
zChannel._lookupc                 C  s   |  |}|| jvrdS | j| }tj| j| jdtj ddj|j	d}t
t tdd |D W  d   S 1 s=w   Y  d	S )
zReturn the number of messages in a queue.

        This is a *rough* estimation, as Pub/Sub doesn't provide
        an exact API.
        r   z;pubsub.googleapis.com/subscription/num_undelivered_messagesr#   )end_timeminutes)rZ   c                 s  s    | ]
}|j d  jjV  qdS )r   N)pointsvalueint64_value)r)   contentr+   r+   r,   	<genexpr>  s    
z Channel._size.<locals>.<genexpr>Nr>   )r   rf   r   Querymonitorrr   datetimenowselect_resourcesrZ   r   r   sum)r3   r   r   resultr+   r+   r,   _size  s&   



 zChannel._sizec           	        s|   |rt d| j|j}|d }|d }|d }td|| |d }| |g| | j| }|j	| t
 | dS )zAcknowledge one message.zmultiple acks not implementedr   r   r   z!ack message. queue: %s ack_id: %sr[   N)r   r   rU   r   rl   r   r   rf   r]   rB   rj   	basic_ack)	r3   delivery_tagmultipler   pubsub_messager   r   r[   r   r|   r+   r,   r     s   
zChannel.basic_ackack_ids	list[str]c                 C  s"   | j j||dt| jdd d S )N)r   r   r   )r   r   )r   acknowledger   r   )r3   r   r[   r+   r+   r,   r   #  s   

zChannel._do_ackc                 C  sH   |  |}| j|}|sdS | |}| jj|jtj dd |S )z'Delete all current messages in a queue.N)r   timer   )	r   rf   rU   r   r   seekr[   r   r   )r3   r   r   rQ   r+   r+   r,   _purge)  s   

zChannel._purgec              	   C  s   t  }td| | jd }t|| jd }| j|s[| j	
 D ]2}t|jdkr4td||j q"td||jt|jt|j | jj|jt|j| jdd q"| j|rtd	| d S )
Nz/unacked deadline extension thread: [%s] started      r   z'thread [%s]: no unacked messages for %sz5thread [%s]: extend ack deadline for %s: %d msgs [%s])r   r   r   r   z.unacked deadline extension thread [%s] stopped)rt   get_native_idrl   rm   _min_ack_deadlinemaxr   rw   r   rf   valuesrC   r]   r   r[   r<   r   modify_ack_deadline)r3   	thread_idmin_deadline_sleep
sleep_timer   r+   r+   r,   rv   9  sB   
z Channel._extend_unacked_deadlinec                 C  s8   |  |}| j| j|}td|| | j| d S )Nz0after_reply_message_received: queue: %s, sub: %s)r   r   r[   rr   rl   r   rg   r   )r3   r   subr+   r+   r,   after_reply_message_received\  s   
z$Channel.after_reply_message_receivedc                 C     t  S r/   )r   r2   r+   r+   r,   r   d     zChannel.subscriberc                 C  r  r/   )r   r2   r+   r+   r,   r   h  r  zChannel.publisherc                 C  s   t  S r/   )r   MetricServiceClientr2   r+   r+   r,   r   l     zChannel.monitorc                 C  s   | j jS r/   )
connectionclientr2   r+   r+   r,   rn   p  r  zChannel.conninfoc                 C  s
   | j jjS r/   )r  r  transport_optionsr2   r+   r+   r,   r  t  rG   zChannel.transport_optionsc                 C     | j d| jS )Nr   )r  rU   default_wait_time_secondsr2   r+   r+   r,   r   x     zChannel.wait_time_secondsc                 C  r  )Nr   )r  rU   default_retry_timeout_secondsr2   r+   r+   r,   r   ~  r  zChannel.retry_timeout_secondsc                 C  r  )Nr   )r  rU   default_ack_deadline_secondsr2   r+   r+   r,   r     r  zChannel.ack_deadline_secondsc                 C  s   | j ddS )Nr   zkombu-)r  rU   r2   r+   r+   r,   r     s   zChannel.queue_name_prefixc                 C  r  )Nr   )r  rU   default_expiration_secondsr2   r+   r+   r,   r     r  zChannel.expiration_secondsc                 C  r  )Nr   )r  rU   default_bulk_max_messagesr2   r+   r+   r,   r     r  zChannel.bulk_max_messagesc                   s   t d | jr3| j }tt t d| | jjd|id W d   n1 s+w   Y  | js| j	 sB| j
  tj  t   dS )zClose the channel.zclosing channelzdeleting subscription: %sr   r   N)rl   r   rg   r?   r   	Exceptionr   r   rs   rT   rw   setra   re   joinrj   close)r3   r  r|   r+   r,   r    s   





zChannel.closec                 C  s   | d  di  dd}|S )Nr   r   r    )rU   )r   r   r+   r+   r,   r     s
   zChannel._get_routing_key)rX   rW   r~   rW   r/   )rr   rW   r   rW   r   r   r~   rW   )rY   rW   r~   r   )NNNNNN)rr   rW   r   rW   rY   rW   r[   rW   r   r   r~   rW   )r   rW   r   r   )r   r   )r~   r   )r   rW   r~   r   )F)r   r   r[   rW   )r   rW   )=rH   rI   rJ   rK   supports_fanout
do_restorer  r  r  r  r  r  r  r   re   r^   rt   Eventrw   rL   rs   rf   rg   r4   CHARS_REPLACE_TABLEr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  rv   r  r"   r   r   r   propertyrn   r  r   r   r   r   r   r   r  staticmethodr   __classcell__r+   r+   r|   r,   ra      s   
 C
(
+

3



#










ra   c                      s   e Zd ZdZeZdZdZejj	e
jf Z	ejjejejejejejf ejf ZdZdZejjjeddgdZ fd	d
Zdd ZedddZed dddZd!ddZdd Z dd Z!dd Z"  Z#S )"rp   zGCP Pub/Sub transport.Tg?gcpubsub	pubsub_v1r   r   )r   c                   s(   t  j|fi | t | _t | _d S r/   )rj   r4   r   _poolr   _get_bulk_future_to_queue)r3   r  r{   r|   r+   r,   r4     s   zTransport.__init__c                 C  s   t jS r/   )package_version__version__r2   r+   r+   r,   driver_version  s   zTransport.driver_versionurirW   r~   c                 C  s   |  dd }|dS )Nzgcpubsub://projects/r#   /)splitstrip)r/  r   r+   r+   r,   rq     s   
zTransport.parse_uriF**c                 C  s   |pdS )Nzgcpubsub://r+   )r3   r/  include_passwordmaskr+   r+   r,   as_uri  r  zTransport.as_uriNc                 C  sn   t  }| j}|r|r||kr|}	 z	| j|d W d S  ty5   |r-t  | |kr-t |r3t| Y nw q)Nr#   r   )r   polling_interval_drain_from_active_queuesr	   socket_timeoutr   )r3   r  r   
time_startr7  r+   r+   r,   drain_events  s    zTransport.drain_eventsc           	      C  s   |    | jdd t| j|td\}}dd |D }||8 }|D ]	}| j|d  q!|s0t tdt	| |D ],}|
 \}}|D ]}td| || jvrXtd| qD| || qD| j|d  q:d S )	Nrb   r   )r   return_whenc                 S  s   h | ]}|  r|qS r+   )	exceptionr)   fr+   r+   r,   	<setcomp>  s    z6Transport._drain_from_active_queues.<locals>.<setcomp>zgot %d done get_bulk tasksz consuming message from queue: %sz&Message for queue %s without consumers)_rm_empty_bulk_requests_submit_get_bulk_requestsr   r+  r   r?   r	   rl   r   rC   r   
_callbackswarning_deliver)	r3   r   doner&   emptyr?  r   payloadsr   r+   r+   r,   r8    s4   

z#Transport._drain_from_active_queuesc                 C  s,   dd | j D }|D ]	}| j |d  q
d S )Nc                 S  s    h | ]}|  r| r|qS r+   )rF  r=  r>  r+   r+   r,   r@    s    z4Transport._rm_empty_bulk_requests.<locals>.<setcomp>)r+  r?   )r3   rG  r?  r+   r+   r,   rA    s   z!Transport._rm_empty_bulk_requestsc                 C  sP   t | j }| jD ]}|jD ]}||v rq| j|j||}|| j|< qq
d S r/   )r  r+  r  channels_active_queuesr*  submitr   )r3   r   queues_with_submitted_get_bulkchannelr   futurer+   r+   r,   rB     s   

z#Transport._submit_get_bulk_requests)r/  rW   r~   rW   )Fr3  r/   )$rH   rI   rJ   rK   ra   can_parse_urlr7  r$   rp   connection_errorspubsub_exceptionsTimeoutErrorchannel_errorspublisher_exceptionsFlowControlLimitErrorMessageTooLargeErrorPublishError#PublishToPausedOrderingKeyExceptionsubscriber_exceptionsAcknowledgeErrordriver_typedriver_name
implementsr=   	frozensetr4   r.  r&  rq   classmethodr6  r;  r8  rA  rB  r'  r+   r+   r|   r,   rp     sD    

#	rp   )MrK   
__future__r   r_   r   stringrt   concurrent.futuresr   r   r   r   
contextlibr   osr   r   r	   r
   r   r   r   uuidr   r   _socketr   r   r9  google.api_core.exceptionsr   r   r   google.api_core.retryr   google.cloudr   google.cloud.monitoring_v3r   google.cloud.pubsub_v1r   r   r   rQ   google.cloud.pubsub_v1.publisherrT  !google.cloud.pubsub_v1.subscriberrY  google.pubsub_v1r   r,  kombu.entityr   	kombu.logr   kombu.utils.encodingr   r   kombu.utils.jsonr    r!   kombu.utils.objectsr"   r   r$   rl   r  punctuationPUNCTUATIONS_TO_REPLACEr(   r$  r.   rL   	dataclassrV   ra   rp   r+   r+   r+   r,   <module>   s\    ' 
    