o
    h6#                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 zddl
Z
ddlZ
ddlZ
ddlZ
W n ey;   dZ
Y nw dZeeZd	Zd
ZdZdZdZdZdZdZdd ZG dd de	ZdS )z@Apache Cassandra result store backend using the DataStax driver.    N)states)ImproperlyConfigured)
get_logger   )BaseBackend)CassandraBackendz
You need to install the cassandra-driver library to
use the Cassandra backend.  See https://github.com/datastax/python-driver
z
CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
See https://datastax.github.io/python-driver/api/cassandra/auth.html.
z(Cassandra backend improperly configured.z!Cassandra backend not configured.z
INSERT INTO {table} (
    task_id, status, result, date_done, traceback, children) VALUES (
        %s, %s, %s, %s, %s, %s) {expires};
z]
SELECT status, result, date_done, traceback, children
FROM {table}
WHERE task_id=%s
LIMIT 1
z
CREATE TABLE {table} (
    task_id text,
    status text,
    result blob,
    date_done timestamp,
    traceback blob,
    children blob,
    PRIMARY KEY ((task_id), date_done)
) WITH CLUSTERING ORDER BY (date_done DESC);
z
    USING TTL {0}
c                 C   s
   t | dS )Nutf8)bytes)x r   [/var/www/html/optinet_system/venv/lib/python3.10/site-packages/celery/backends/cassandra.pybuf_tC   s   
r   c                       sh   e Zd ZdZdZdZdZ		d fdd	ZdddZ	dd	d
Z	dddZ
dd Zd fdd	Z  ZS )r   aG  Cassandra/AstraDB backend utilizing DataStax driver.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`cassandra-driver` is not available,
            or not-exactly-one of the :setting:`cassandra_servers` and
            the :setting:`cassandra_secure_bundle_path` settings is set.
    NTc                    s  t  jdi | tstt| jj}|p|dd | _|p#|dd | _	|p.|dd p.d| _
|p7|dd | _|p@|dd | _|di | _| jpN| j	}	|	rW| jrW| js[tt| jre| j	rett|pl|dd }
|
d urvt|
nd	| _|d
pd}|dpd}ttj|tjj| _ttj|tjj| _d | _|dd }|dd }|r|rttj|d }|stt|di || _d | _d | _d | _d | _t  | _!d S )Ncassandra_serverscassandra_secure_bundle_pathcassandra_portiR#  cassandra_keyspacecassandra_tablecassandra_optionscassandra_entry_ttl cassandra_read_consistencyLOCAL_QUORUMcassandra_write_consistencycassandra_auth_providercassandra_auth_kwargsr   )"super__init__	cassandrar   E_NO_CASSANDRAappconfgetserversbundle_pathportkeyspacetabler   E_CASSANDRA_NOT_CONFIGUREDE_CASSANDRA_MISCONFIGURED	Q_EXPIRESformat
cqlexpiresgetattrConsistencyLevelr   read_consistencywrite_consistencyauth_providerauth!E_NO_SUCH_CASSANDRA_AUTH_PROVIDER_cluster_session_write_stmt
_read_stmt	threadingRLock_lock)selfr"   r%   r&   	entry_ttlr$   r#   kwargsr    db_directionsexpires	read_cons
write_consr0   auth_kwargsauth_provider_class	__class__r   r   r   X   sV   zCassandraBackend.__init__Fc                 C   sz  | j durdS | j  zz| j durW W | j  dS | jr2tjj| jf| j| j	d| j
| _ntjjdd| ji| j	d| j
| _| j| j| _ tjtj| j| jd| _| j| j_tjtj| jd| _| j| j_|rtjtj| jd}| j|_z| j | W n
 tjy   Y nw W n tjy   | jdur| j  d| _d| _  w W | j  dS | j  w )zjPrepare the connection for action.

        Arguments:
            write (bool): are we a writer?
        N)r$   r0   secure_connect_bundle)cloudr0   )r&   r>   )r&   r   ) r4   r9   acquirereleaser"   r   clusterClusterr$   r0   r   r3   r#   connectr%   querySimpleStatementQ_INSERT_RESULTr*   r&   r+   r5   r/   consistency_levelQ_SELECT_RESULTr6   r.   Q_CREATE_RESULT_TABLEexecuteAlreadyExistsOperationTimedOutshutdown)r:   write	make_stmtr   r   r   _get_connection   sl   


;


	


z CassandraBackend._get_connectionc                 K   sV   | j dd | j| j||t| || j t| |t| | |f dS )z1Store return value and state of an executed task.T)rV   N)	rX   r4   rR   r5   r   encoder   nowcurrent_task_children)r:   task_idresultstate	tracebackrequestr<   r   r   r   _store_result   s   

zCassandraBackend._store_resultc                 C   s   dS )Nzcassandra://r   )r:   include_passwordr   r   r   as_uri   s   zCassandraBackend.as_uric              
   C   sf   |    | j| j|f }|stjddS |\}}}}}| ||| ||| || |dS )z$Get task meta-data for a task by id.N)statusr]   )r\   rd   r]   	date_doner_   children)	rX   r4   rR   r6   oner   PENDINGmeta_from_decodeddecode)r:   r\   resrd   r]   re   r_   rf   r   r   r   _get_task_meta_for   s   z#CassandraBackend._get_task_meta_forr   c                    s2   |si n|}| | j| j| jd t ||S )N)r"   r%   r&   )updater"   r%   r&   r   
__reduce__)r:   argsr<   rC   r   r   rn      s   zCassandraBackend.__reduce__)NNNNNN)F)NN)T)r   N)__name__
__module____qualname____doc__r"   r#   supports_autoexpirer   rX   ra   rc   rl   rn   __classcell__r   r   rC   r   r   G   s    

6I

r   )rs   r7   celeryr   celery.exceptionsr   celery.utils.logr   baser   r   cassandra.authcassandra.clustercassandra.queryImportError__all__rp   loggerr   r2   r(   r'   rN   rP   rQ   r)   r   r   r   r   r   r   <module>   s4    