o
    }jhI                     @  sD  d dl mZ d dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZmZ d dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZm Z  ddl!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1 ddl2m3Z3 d	d
l4m5Z5 d	dl6m7Z7 d	dl8m9Z9 erd	dl:m;Z; e<e=Z>G dd dZ?dS )    )annotationsN)TYPE_CHECKINGAnyCallableDictListMappingOptional)assert_never)BroadcastCallbackBroadcastPayloadCallbackChannelEventsChannelStatesPostgresChangesCallbackPostgresChangesDataPresenceOnJoinCallbackPresenceOnLeaveCallbackRealtimeAcknowledgementStatusRealtimeChannelBroadcastConfigRealtimeChannelConfigRealtimeChannelOptionsRealtimeChannelPresenceConfig"RealtimePostgresChangesListenEventRealtimePresenceStateRealtimeSubscribeStates   )BroadcastMessageChannelCloseMessageChannelErrorMessageHeartbeatMessageMessagePostgresChangesMessagePostgresChangesPayloadPostgresRowChangePresenceDiffMessagePresenceStateMessageReplyMessageReplyPostgresChangesServerMessageSuccessReplyMessageSuccessSystemPayloadSystemMessage)http_endpoint_url   )AsyncRealtimePresence)	AsyncPush)
AsyncTimer)AsyncRealtimeClientc                   @  s>  e Zd ZdZ	d]d^ddZdd Zd_ddZedd Zedd Z	edd Z
edd Zedd Z	d]d`dd Zdad!d"Z	d]dbd(d)Zdcd*d+Zddd-d.Z			dedfd5d6Zdgd8d9Zdhd;d<Zdad=d>Zdid@dAZdjdCdDZdkdFdGZdldIdJZdmdMdNZdOdP ZdadQdRZdSdT ZdmdUdVZdndYdZZd[d\ Z dS )oAsyncRealtimeChannela  
    Channel is an abstraction for a topic subscription on an existing socket connection.
    Each Channel has its own topic and a list of event-callbacks that respond to messages.
    Should only be instantiated through `AsyncRealtimeClient.channel(topic)`.
    Nsocketr2   topicstrparams Optional[RealtimeChannelOptions]returnNonec                   s   | _ |r|nddddddiddi _| _d _t  _tj _g  _	 j j
 _
t tj j _i  _g  _g  _g  _d _t jdd	  _   _d fdd} fdd} jtj|tj| dS )z
        Initialize the Channel object.

        :param socket: RealtimeClient object
        :param topic: Topic that it subscribes to on the realtime server
        :param params: Optional parameters for connection.
        configF)ackselfkey )	broadcastpresenceprivateNc                 S  s   d|  S )Nr    )triesrC   rC   M/var/www/html/bot/env/lib/python3.10/site-packages/realtime/_async/channel.py<lambda>n   s    z/AsyncRealtimeChannel.__init__.<locals>.<lambda>payloadr(   c                   s6   t j _ j   jD ]	}t|  qg  _d S N)	r   JOINEDstaterejoin_timerreset_push_bufferasynciocreate_tasksend)rG   pushr=   rC   rE   on_join_push_oks   s
   


z6AsyncRealtimeChannel.__init__.<locals>.on_join_push_okc                     s2    j sd S td j  tj _ j  d S )Nzjoin push timeout for channel )	
is_joiningloggererrorr5   r   ERROREDrJ   rK   schedule_timeoutrC   rR   rC   rE   on_join_push_timeoutz   s
   z;AsyncRealtimeChannel.__init__.<locals>.on_join_push_timeoutrG   r(   )r4   r7   r5   _joined_oncer/   rA   r   CLOSEDrJ   rM   timeoutr0   r   join	join_pushmessages_waiting_for_ackbroadcast_callbackssystem_callbackspostgres_changes_callbackssubscribe_callbackr1   _rejoin_until_connectedrK   _broadcast_endpoint_urlbroadcast_endpoint_urlreceiver   OkTimeout)r=   r4   r5   r7   rS   rY   rC   rR   rE   __init__D   sB   


zAsyncRealtimeChannel.__init__c                 C  s6   t d| j d | j  tj| _| j	|  d S )Nchannel z closed)
rU   infor5   rK   rL   r   r\   rJ   r4   _remove_channelrR   rC   rC   rE   on_close   s   
zAsyncRealtimeChannel.on_closerG   dict[str, Any]c                 C  s>   | j s| jrd S td| j d|  tj| _| j	  d S )Nrl   z error: )

is_leaving	is_closedrU   rm   r5   r   rW   rJ   rK   rX   )r=   rG   rC   rC   rE   on_error   s
   zAsyncRealtimeChannel.on_errorc                 C     | j tjkS rH   )rJ   r   r\   rR   rC   rC   rE   rr         zAsyncRealtimeChannel.is_closedc                 C  rt   rH   )rJ   r   JOININGrR   rC   rC   rE   rT      ru   zAsyncRealtimeChannel.is_joiningc                 C  rt   rH   )rJ   r   LEAVINGrR   rC   rC   rE   rq      ru   zAsyncRealtimeChannel.is_leavingc                 C  rt   rH   )rJ   r   rW   rR   rC   rC   rE   
is_errored   ru   zAsyncRealtimeChannel.is_erroredc                 C  rt   rH   )rJ   r   rI   rR   rC   rC   rE   	is_joined   ru   zAsyncRealtimeChannel.is_joinedcallbackHOptional[Callable[[RealtimeSubscribeStates, Optional[Exception]], None]]c           
        s   j jsj  I dH  jrtdjd }|d}|d}|dd}d|||dd	 jD d
i}j jrBj j|d< j	
| d_d fdd}d fdd} fdd}	j	tj|tj|tj|	  I dH  S )a  
        Subscribe to the channel. Can only be called once per channel instance.

        :param callback: Optional callback function that receives subscription state updates
                        and any errors that occur during subscription
        :return: The Channel instance for method chaining
        :raises: Exception if called multiple times on the same channel instance
        NzdTried to subscribe multiple times. 'subscribe' can only be called a single time per channel instancer;   r@   rA   rB   Fc                 S     g | ]}|j qS rC   binding_filter.0crC   rC   rE   
<listcomp>       z2AsyncRealtimeChannel.subscribe.<locals>.<listcomp>r@   rA   rB   postgres_changesaccess_tokenTrG   r(   c                   s   | j }g }|rbtjD ]U\}}|t|k r|| nd }t| d|  |rJ|j|jkrJ|j|j	krJ|j
|j
krJ|j|jkrJ|j|_|| qt   o^ tjtd  d S   d S |_ oo tjd  d S  d S )Nz, z@mismatch between server and client bindings for postgres changes)r   	enumeraterc   lenrU   rm   eventseventschema_schematablefilteridappendrN   rO   unsubscriber   CHANNEL_ERROR	Exception
SUBSCRIBED)rG   server_postgres_changesnew_postgres_bindingsipostgres_callbackserver_bindingrz   r=   rC   rE   rS      s>   z7AsyncRealtimeChannel.subscribe.<locals>.on_join_push_okDict[str, Any]c                   s$    o t jtt|  d S  d S rH   )r   r   r   jsondumps)rG   rz   rC   rE   on_join_push_error   s   z:AsyncRealtimeChannel.subscribe.<locals>.on_join_push_errorc                    s    o
 t jd  d S  d S rH   )r   	TIMED_OUTargsr   rC   rE   rY     s   z<AsyncRealtimeChannel.subscribe.<locals>.on_join_push_timeoutrZ   )rG   r   )r4   is_connectedconnectr[   r   r7   getrc   r   r_   update_payloadrh   r   ri   Errorrj   _rejoin)
r=   rz   r;   r@   rA   rB   config_payloadrS   r   rY   rC   r   rE   	subscribe   sF   


&zAsyncRealtimeChannel.subscribec                   sd   t j _ j   j  d fdd}t tj	i }|
tj|
tj| | I dH  dS )z
        Unsubscribe from the channel and leave the topic.
        Sets channel state to LEAVING and cleans up timers and pushes.
        r9   r:   c                    s    t d j d    d S )Nrl   z leave)rU   rm   r5   ro   r   rR   rC   rE   _close  s   z0AsyncRealtimeChannel.unsubscribe.<locals>._closeNr9   r:   )r   rw   rJ   rK   rL   r_   destroyr0   r   leaverh   r   ri   r   rP   )r=   r   
leave_pushrC   rR   rE   r     s   

z AsyncRealtimeChannel.unsubscriber   r   r]   Optional[int]r0   c                   s|   | j std| d| j d|p| j}t| |||}|  r2| I dH  |jdus0J d|S |  | j	
| |S )aN  
        Push a message to the channel.

        :param event: The event name to push
        :param payload: The payload to send
        :param timeout: Optional timeout in milliseconds
        :return: AsyncPush instance representing the push operation
        :raises: Exception if called before subscribing to the channel
        ztried to push 'z' to 'z?' before joining. Use channel.subscribe() before pushing eventsNz Sent AsyncPush should have a ref)r[   r   r5   r]   r0   	_can_pushrP   refstart_timeoutrM   r   )r=   r   rG   r]   rQ   rC   rC   rE   rQ   #  s   
zAsyncRealtimeChannel.pushc                   s~   | j d }|d}|d}|dd}d|||dd | jD di}t| jtjd|i| j d	}| j	|I d
H  | S )zx
        Coroutine that attempts to join Phoenix Realtime server via a certain topic.

        :return: Channel
        r;   r@   rA   rB   Fc                 S  r|   rC   r}   r   rC   rC   rE   r   P  r   z-AsyncRealtimeChannel.join.<locals>.<listcomp>r   )r5   r   rG   r   N)
r7   r   rc   r!   r5   r   r^   r4   	_make_refrP   )r=   r;   r@   rA   rB   r   messagerC   rC   rE   r^   @  s,   



zAsyncRealtimeChannel.join"Callable[[BroadcastPayload], None]c                 C  s   | j t||d | S )a"  
        Set up a listener for a specific broadcast event.

        :param event: The name of the broadcast event to listen for
        :param callback: Function called with the payload when a matching broadcast is received
        :return: The Channel instance for method chaining
        )rz   r   )ra   r   r   )r=   r   rz   rC   rC   rE   on_broadcast^  s   

z!AsyncRealtimeChannel.on_broadcastr   (Callable[[PostgresChangesPayload], None]r   Optional[str]r   r   c                 C  s"   t |||||d}| j| | S )a  
        Set up a listener for Postgres database changes.

        :param event: The type of database event to listen for (INSERT, UPDATE, DELETE, or *)
        :param callback: Function called with the payload when a matching change is detected
        :param table: The table name to monitor. Defaults to "*" for all tables
        :param schema: The database schema to monitor. Defaults to "public"
        :param filter: Optional filter string to apply
        :return: The Channel instance for method chaining
        )rz   r   r   r   r   )r   rc   r   )r=   r   rz   r   r   r   rC   rC   rE   on_postgres_changesm  s
   
z(AsyncRealtimeChannel.on_postgres_changes&Callable[[SuccessSystemPayload], None]c                 C     | j | | S )z
        Set up a listener for system events.

        :param callback: The callback function to execute when a system event is received.
        :return: The Channel instance for method chaining.
        )rb   r   r=   rz   rC   rC   rE   	on_system     	zAsyncRealtimeChannel.on_systemuser_statusc                   s   |  d|I dH  dS )z
        Track presence status for the current user.

        :param user_status: Dictionary containing the user's presence information
        trackNsend_presence)r=   r   rC   rC   rE   r     s   zAsyncRealtimeChannel.trackc                   s   |  di I dH  dS )z>
        Stop tracking presence for the current user.
        untrackNr   rR   rC   rC   rE   r     s   zAsyncRealtimeChannel.untrackr   c                 C  s   | j jS )z
        Get the current state of presence on this channel.

        :return: Dictionary mapping presence keys to lists of presence payloads
        )rA   rJ   rR   rC   rC   rE   presence_state  s   z#AsyncRealtimeChannel.presence_stateCallable[[], None]c                 C  r   )z
        Register a callback for presence sync events.

        :param callback: The callback function to execute when a presence sync event occurs.
        :return: The Channel instance for method chaining.
        )rA   on_syncr   rC   rC   rE   on_presence_sync  s   z%AsyncRealtimeChannel.on_presence_syncr   c                 C  r   )z
        Register a callback for presence join events.

        :param callback: The callback function to execute when a presence join event occurs.
        :return: The Channel instance for method chaining.
        )rA   on_joinr   rC   rC   rE   on_presence_join  r   z%AsyncRealtimeChannel.on_presence_joinr   c                 C  r   )z
        Register a callback for presence leave events.

        :param callback: The callback function to execute when a presence leave event occurs.
        :return: The Channel instance for method chaining.
        )rA   on_leaver   rC   rC   rE   on_presence_leave  r   z&AsyncRealtimeChannel.on_presence_leavedatar   c                   s"   |  tjd||dI dH  dS )z
        Send a broadcast message through this channel.

        :param event: The name of the broadcast event
        :param data: The payload to broadcast
        r@   )typer   rG   N)rQ   r   r@   r=   r   r   rC   rC   rE   send_broadcast  s
   
z#AsyncRealtimeChannel.send_broadcastc                 C  s   t | jj dS )Nz/api/broadcast)r-   r4   http_endpointrR   rC   rC   rE   rf     s   z,AsyncRealtimeChannel._broadcast_endpoint_urlc                   s<   | j rd S | j| jI d H  tj| _| j I d H  d S rH   )	rq   r4   _leave_open_topicr5   r   rv   rJ   r_   resendrR   rC   rC   rE   r     s   zAsyncRealtimeChannel._rejoinc                 C  s   | j jo| jS rH   )r4   r   r[   rR   rC   rC   rE   r     s   zAsyncRealtimeChannel._can_pushc                   s    |  tj||dI d H  d S )N)r   rG   )rQ   r   rA   r   rC   rC   rE   r     s   z"AsyncRealtimeChannel.send_presencer   r)   c           	      C  s  t | j d|  t|tr-t|jtr#| jD ]}||j qd S | t	|j d S t|t
r^|j}|jrZ| j|j }r\|jdkrP|tj|j d S |tj|j d S d S d S t|trr|j}| jD ]}|| qid S t|tr| j|j d S t|tr| j|j d S t|tr|j}| jD ]}|| qd S t|tr| |j d S t|tr|   d S t|t rd S t!| d S )Nz : ok)"rU   rm   r5   
isinstancer,   rG   r+   rb   rs   dictr'   r   r`   popstatustriggerr   ri   responser   r   ra   r&   rA   _on_state_eventr%   _on_diff_eventr"   rc   r   r   ro   r    r
   )	r=   r   rz   reply_payloadrQ   broadcast_payloadbroadcast_callbackrG   r   rC   rC   rE   _handle_message  sN   




	









z$AsyncRealtimeChannel._handle_messagec                   s*   | j   | jjr|  I d H  d S d S rH   )rK   rX   r4   r   r   rR   rC   rC   rE   re     s
   
z,AsyncRealtimeChannel._rejoin_until_connectedrH   )r4   r2   r5   r6   r7   r8   r9   r:   )rG   rp   )rz   r{   r9   r3   r   )r   r6   rG   r   r]   r   r9   r0   )r9   r3   )r   r6   rz   r   r9   r3   )NNN)r   r   rz   r   r   r   r   r   r   r   r9   r3   )rz   r   r9   r3   )r   r   r9   r:   )r9   r   )rz   r   r9   r3   )rz   r   r9   r3   )rz   r   r9   r3   )r   r6   r   r   r9   r:   )r   r)   )!__name__
__module____qualname____doc__rk   ro   rs   propertyrr   rT   rq   rx   ry   r   r   rQ   r^   r   r   r   r   r   r   r   r   r   r   rf   r   r   r   r   re   rC   rC   rC   rE   r3   =   sR    
B
	





e













(r3   )@
__future__r   rN   r   loggingtypingr   r   r   r   r   r   r	   typing_extensionsr
   realtime.typesr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   transformersr-   rA   r/   rQ   r0   timerr1   clientr2   	getLoggerr   rU   r3   rC   rC   rC   rE   <module>   s     $LH
