o
    	hC,                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	lmZ dd
lmZ ddlmZ dZG dd de	Zdd Zdd ZdS )zAmazon SQS Connection.    )annotationsN)
Serializer)	transform)AsyncAWSQueryConnection)
AWSRequest   )boto3)AsyncMessage)
AsyncQueue)AsyncSQSConnectionc                      s
  e Zd ZdZd6 fdd	Zdd Zdd	 Zd7d
dZ	d7ddZd8ddZ	dd Z
d9ddZd:ddZ			d;ddZd:ddZd:ddZ	d:dd Z	d7d!d"Zd:d#d$Z	d:d%d&Zd:d'd(Zd<d*d+Zd:d,d-ZeZd.d/ Zd:d0d1Z	d:d2d3Zd:d4d5Z  ZS )=r   zAsync SQS Connection.r   Nc                   sH   t d u rtdt j|f||d| |d ur|| _d S dg| _d S )Nzboto3 is not installed)region_namedebugApproximateReceiveCount)r   ImportErrorsuper__init__fetch_message_attributes)selfsqs_connectionr   regionr   kwargs	__class__ g/var/www/html/optinet_system/venv/lib/python3.10/site-packages/kombu/asynchronous/aws/sqs/connection.pyr      s   
zAsyncSQSConnection.__init__c                 C  s^   |  }|r
||d< d|i}i }| dkrd|i}| dkr$d|d< td	|||d|S )
NActiondatagetparamspostz0application/x-www-form-urlencoded; charset=utf-8Content-Type)methodurlheadersr   )copylowerr   )r   	operationr   	queue_urlr!   param_payloadr#   r   r   r   _create_query_request$   s   z(AsyncSQSConnection._create_query_requestc                 C  s   |  }||d< | jjj}||}| jjj}i }|jd }d| }	|	|d< d|jd |j	}
|
|d< t
| |d}|jd	tj}td||d
|S )NQueueUrljsonVersionzapplication/x-amz-json-r    z{}.{}targetPrefixzX-Amz-Target)r   r#   r!   )r!   r"   r   )r$   r   metaservice_modeloperation_model	_endpointhostmetadataformatnamejsondumpsencodehttpr   r   DEFAULT_METHODr   )r   r&   r   r'   r.   r/   r"   r#   json_versioncontent_typetargetr(   r!   r   r   r   _create_json_request5   s0   




z'AsyncSQSConnection._create_json_requestc                 C  s   | j j}| j jj}|j}	i |pi ||	i }
|	dkr%| ||
||}n|	dkr1| ||
|}ntd|	 d|j	
 dkrBdnd}|j|||d | }| j||d	S )
a  Override make_request to support different protocols.

        botocore has changed the default protocol of communicating
        with SQS backend from 'query' to 'json', so we need a special
        implementation of make_request for SQS. More information on this can
        be found in: https://github.com/celery/kombu/pull/1807.

        protocol_params: Optional[dict[str, dict]] of per-protocol additional parameters.
            Supported for the SQS query to json protocol transition.
        queryr5   zUnsupported protocol: .r   zpresign-urlstandard)signing_typecallback)r   _request_signerr-   r.   protocolr   r)   r=   	Exceptionr!   r%   signprepare_mexe)r   operation_namer   r'   verbrC   protocol_paramssignerr.   rE   
all_paramsrequestrA   prepared_requestr   r   r   make_requestW   s&   
zAsyncSQSConnection.make_requestc                 C  s*   d|i}|rt |d|d< | jd||dS )N	QueueNamedDefaultVisibilityTimeoutCreateQueuerB   )r3   
get_object)r   
queue_namevisibility_timeoutrC   r   r   r   r   create_queuey   s   zAsyncSQSConnection.create_queueFc                 C  s   | j dd |j|dS )NDeleteQueuerB   
get_statusid)r   queueforce_deletionrC   r   r   r   delete_queue   s   zAsyncSQSConnection.delete_queuec                 C  s   | j j|d}|d S )N)rR   r*   )r   get_queue_url)r   r^   resr   r   r   ra      s   z AsyncSQSConnection.get_queue_urlAllc                 C     | j dd|i|j|dS )NGetQueueAttributesAttributeNamerB   )rV   r]   )r   r^   	attributerC   r   r   r   get_queue_attributes   s   z'AsyncSQSConnection.get_queue_attributesc              	   C  s*   | j di |j|d||ii||dddS )NSetQueueAttribute
Attributes)zAttribute.NamezAttribute.Valuer5   r>   )rC   rL   r[   )r   r^   rg   valuerC   r   r   r   set_queue_attribute   s   
z&AsyncSQSConnection.set_queue_attributer   c              	   C  s   d|i}i i d}	|d ur|n| j }
|r||d< |
r2|	d dt|
i |	d tdt|
i |d ur:||d< | jd	|d
tfg||||	dS )NMaxNumberOfMessages)r>   r5   VisibilityTimeoutr5   AttributeNamesr>   rf   WaitTimeSecondsReceiveMessageMessage)rC   parentrL   )r   updatelist_query_object_encodeget_listr	   )r   r^   r'   number_messagesrX   
attributeswait_time_secondsrC   r   proto_paramsattrsr   r   r   receive_message   s   
z"AsyncSQSConnection.receive_messagec                 C  s   |  |||S N)delete_message_from_handler   r^   receipt_handlerC   r   r   r   delete_message   s   z!AsyncSQSConnection.delete_messagec                 C  sB   ddd |D it ddd |D id}| jdi |jd||d	S )
NEntriesc                 S     g | ]	}|j |jd qS )IdReceiptHandler]   r   .0mr   r   r   
<listcomp>   s    z;AsyncSQSConnection.delete_message_batch.<locals>.<listcomp>DeleteMessageBatchRequestEntryc                 S  r   r   r   r   r   r   r   r      s    rk   DeleteMessageBatchPOSTrK   rC   rL   rw   rV   r]   )r   r^   messagesrC   p_paramsr   r   r   delete_message_batch   s   z'AsyncSQSConnection.delete_message_batchc                 C  s   | j dd|i||dS )NDeleteMessager   rB   )r\   r   r   r   r   r      s   z-AsyncSQSConnection.delete_message_from_handlec                 C  s.   d|i}|rt ||d< | jd||jd|dS )NMessageBodyDelaySecondsSendMessager   rK   rC   )intrV   r]   )r   r^   message_contentdelay_secondsrC   r   r   r   r   send_message   s   zAsyncSQSConnection.send_messagec              
   C  sn   i }t |D ]%\}}d|d  }|| d|d | d|d | d|d i q| jd||jd	|d
S )NzSendMessageBatchRequestEntry.r   z.Idr   z.MessageBodyz.DelaySeconds   SendMessageBatchr   r   )	enumerateru   rV   r]   )r   r^   r   rC   r   imsgprefixr   r   r   send_message_batch   s   z%AsyncSQSConnection.send_message_batchc                 C  s   | j d||d|j|dS )NChangeMessageVisibility)r   ro   rB   r[   )r   r^   r   rX   rC   r   r   r   change_message_visibility   s   z,AsyncSQSConnection.change_message_visibilityc                 C  s<   dd |D }d|it d|id}| jdi |jd||dS )	Nc                 S  s(   g | ]}|d  j |d  j|d dqS )r   r   )r   r   ro   r   )r   tr   r   r   r      s    zFAsyncSQSConnection.change_message_visibility_batch.<locals>.<listcomp>r   (ChangeMessageVisibilityBatchRequestEntryrk   ChangeMessageVisibilityBatchr   r   r   )r   r^   r   rC   entriesr   r   r   r   change_message_visibility_batch   s   
z2AsyncSQSConnection.change_message_visibility_batch c                 C  s(   i }|r||d< | j d|dtfg|dS )NQueueNamePrefix
ListQueuesr*   rB   )rx   r
   )r   r   rC   r   r   r   r   get_all_queues  s   z!AsyncSQSConnection.get_all_queuesc                 C  s   |  |t| j||S r   )r   r   _on_queue_ready)r   rW   rC   r   r   r   	get_queue  s   zAsyncSQSConnection.get_queuec                   s   t  fdd|D d S )Nc                 3  s     | ]}|j  r|V  qd S r   )r"   endswith)r   qr4   r   r   	<genexpr>  s    z5AsyncSQSConnection._on_queue_ready.<locals>.<genexpr>)next)r   r4   queuesr   r   r   r     s   z"AsyncSQSConnection._on_queue_readyc                 C  s   | j dd|jidtfg|dS )NListDeadLetterSourceQueuesr*   rB   )rx   r"   r
   )r   r^   rC   r   r   r   get_dead_letter_source_queues  s
   
z0AsyncSQSConnection.get_dead_letter_source_queuesc                 C  s   | j d|||d|j|dS )NAddPermission)LabelAWSAccountId
ActionNamerB   r[   )r   r^   labelaws_account_idaction_namerC   r   r   r   add_permission   s   z!AsyncSQSConnection.add_permissionc                 C  rd   )NRemovePermissionr   rB   r[   )r   r^   r   rC   r   r   r   remove_permission*  s   z$AsyncSQSConnection.remove_permission)r   NN)NN)FN)rc   Nr   )r   NNNN)r   N)__name__
__module____qualname____doc__r   r)   r=   rQ   rY   r`   ra   rh   rm   r~   r   r   r   r   r   r   r   r   r   lookupr   r   r   r   __classcell__r   r   r   r   r      sD    
"#













	

	


r   c                 C  s"   i }t |d|  dd | D S )Nr   c                 S  s   i | ]\}}||qS r   r   )r   kvr   r   r   
<dictcomp>3  s    z(_query_object_encode.<locals>.<dictcomp>)_query_object_encode_partitems)r   r   r   r   r   rw   0  s   rw   c                 C  s   |r| dn|}t |ttfr't|D ]\}}t| | |d  | qd S t |trA| D ]\}}t| | | | q0d S t|| |< d S )Nr?   r   )
isinstancerv   tupler   r   dictr   str)r   r   partdottedr   itemkeyrl   r   r   r   r   6  s   
r   )r   
__future__r   r5   botocore.serializer   viner   !kombu.asynchronous.aws.connectionr   kombu.asynchronous.aws.extr   extr   messager	   r^   r
   __all__r   rw   r   r   r   r   r   <module>   s       