o
    Me£"  ã                   @   sr   d dl mZ 	 d dlmZ ddlmZ g d¢ZG dd„ deƒZG dd	„ d	eƒZ	G d
d„ deƒZ
G dd„ deƒZdS )é   )Úglobal_scope)Úcoreé    )ÚDistributedMode)ÚCommunicatorÚFLCommunicatorÚLargeScaleKVc                   @   s|   e Zd Zddd„Z	ddd„Z			d d	d
„Zdd„ Zdd„ Zdd„ Zdd„ Z	dd„ Z
dd„ Zdd„ Zdd„ Zd!dd„ZdS )"r   Nc                 C   sÖ   |dkr|dkr
i }n.|t jkrd |d ¡|d< t|d ƒ|d< t|d ƒ|d< t|d ƒ|d< t|d ƒ|d< d}|t jkrCd}n|t jkrKd	}n|t jkrSd
}n|t jkrZd}|| _|| _d| _	d| _
d| _dS )a¢  
        Communicator is used for async distribute training in distribute_transpiler mode.
        It's a wrapper of a cpp class Communicator and should be used inside fleet API.

        Args:
            program(Program): the trainers program after transpile of distribute_transpiler.
            It's used by communicator to extract the information to do communication.

        Returns:
            None

        Examples:
            .. code-block:: python

                import paddle.fluid as fluid

                prog = fluid.Program()
                comm = fluid.communicator.Communicator(prog)
                comm.start()
                comm.stop()
        Nú,Zpserver_endpointsZtrainersÚ
trainer_idZneed_global_stepZbarrier_table_idÚSYNCÚASYNCÚ
HALF_ASYNCÚGEO)r   r   ÚjoinÚstrr   r   r   ÚmodeÚenvsÚcommunicator_Ú	send_ctx_Ú	recv_ctx_)Úselfr   Úkwargsr   Zmode_str© r   úID:\Projects\ConvertPro\env\Lib\site-packages\paddle/fluid/communicator.pyÚ__init__*   s4   €
ÿ




zCommunicator.__init__c              	   C   s:   |d krt ƒ }t | j|||||| j¡| _|| _|| _d S ©N)r   r   ZDistCommunicatorr   r   r   r   r   )r   Úsend_ctxZrecv_ctxZ	proto_txtZunit64_hostsÚscoper   r   r   Úinit_with_ctx`   s   
þ
zCommunicator.init_with_ctxé ¡ é'  é   c                 C   s   | j  |||¡ d S r   )r   Ú"create_client_to_client_connection)r   Zpserver_timeout_msZpserver_connect_timeout_msZ	max_retryr   r   r   r"   n   s   ÿz/Communicator.create_client_to_client_connectionc                 C   s
   | j  ¡ S r   )r   Úget_client_info©r   r   r   r   r#   u   s   
zCommunicator.get_client_infoc                 C   ó   | j  |¡ d S r   )r   Úset_clients)r   Z	host_listr   r   r   r&   x   ó   zCommunicator.set_clientsc                 C   ó$   | j dkrtdƒ dS | j  ¡  dS )ao  
        Start communicator. Should call before training process.

        Returns:
            None

        Examples:
            .. code-block:: python

                import paddle.fluid as fluid

                prog = fluid.Program()
                comm = fluid.communicator.Communicator(prog)
                comm.start()
                comm.stop()
        Nz;you must call init_with_ctx first to init comm before start)r   ÚprintÚstartr$   r   r   r   r*   {   ó   
zCommunicator.startc                 C   r(   )am  
        Stop communicator. Should call after training process.

        Returns:
            None

        Examples:
            .. code-block:: python

                import paddle.fluid as fluid

                prog = fluid.Program()
                comm = fluid.communicator.Communicator(prog)
                comm.start()
                comm.stop()
        Nú:you must call init_with_ctx first to init comm before stop)r   r)   Ústopr$   r   r   r   r-   ‘   r+   zCommunicator.stopc                 C   r(   )aD  
        Get communicator is running or stop.

        Returns:
            bool

        Examples:
            .. code-block:: python

                import paddle.fluid as fluid

                prog = fluid.Program()
                comm = fluid.communicator.Communicator(prog)
                comm.is_running()
        Nr,   )r   r)   Ú
is_runningr$   r   r   r   r.   §   s   
zCommunicator.is_runningc                 C   ó   | j  ¡  d S r   )r   Úrecvr$   r   r   r   r0   ¼   ó   zCommunicator.recvc                 C   r%   r   )r   Úinit_params©r   Úcontextr   r   r   r2   ¿   r'   zCommunicator.init_paramsc                 C   r%   r   )r   Ú
pull_denser3   r   r   r   r5   Â   r'   zCommunicator.pull_denseéÿÿÿÿc                 C   sd   |d krt ƒ }|  ¡ stdƒ‚t|tƒsJ ‚t|tƒsJ ‚|dkr(| j|  ¡ }| j 	|||¡ d S )NzTCommunicator should init first. Using fleet.init_worker() before push_sparse_param()r6   )
r   r.   Ú
ValueErrorÚ
isinstancer   Úintr   Útable_idr   Úpush_sparse_param)r   Úvar_namer:   r   r   r   r   r;   Å   s   ÿzCommunicator.push_sparse_param)NNr   )r   r    r!   )r6   N)Ú__name__Ú
__module__Ú__qualname__r   r   r"   r#   r&   r*   r-   r.   r0   r2   r5   r;   r   r   r   r   r   (   s"    
;
û
ýr   c                       s6   e Zd Zd
‡ fdd„	Zdd„ Zdd„ Zdd	„ Z‡  ZS )r   Nc                    s<   d }t t| ƒ ||¡ i }i }d}d| _|  ||||¡ d S )NÚ ZWITH_COORDINATOR)Úsuperr   r   r   r   )r   Zps_hostsr   r   r   Z	dense_mapZprototxt©Ú	__class__r   r   r   Õ   s   zFLCommunicator.__init__c                 C   s   | j d kr| j  ||¡ d S r   )r   Ústart_coordinator)r   Zself_endpointZtrainer_endpointsr   r   r   rD   Þ   s
   
ÿz FLCommunicator.start_coordinatorc                 C   s"   | j d kr| j  |¡ d S tdƒ‚)Nzself.communicator_ is null)r   Úsave_fl_strategyr7   )r   Úmpr   r   r   rE   ä   s   
ÿzFLCommunicator.save_fl_strategyc                 C   s   i }| j d kr| j  ¡ }|S r   )r   Úquery_fl_clients_info)r   Zinfo_mpr   r   r   rG   ë   s   

z$FLCommunicator.query_fl_clients_infor   )r=   r>   r?   r   rD   rE   rG   Ú__classcell__r   r   rB   r   r   Ó   s
    	r   c                   @   s,   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	S )
r   c                 C   s   t  ¡ | _d S r   )r   r   Úscale_kvr$   r   r   r   r   ô   r1   zLargeScaleKV.__init__c                 C   ó   | j  ||¡ d S r   )rI   Úsave©r   ÚvarnameÚdirnamer   r   r   rK   ÷   ó   zLargeScaleKV.savec                 C   rJ   r   )rI   ÚloadrL   r   r   r   rP   ú   rO   zLargeScaleKV.loadc                 C   s   | j  |¡S r   )rI   Úsize)r   rM   r   r   r   rQ   ý   s   zLargeScaleKV.sizeN)r=   r>   r?   r   rK   rP   rQ   r   r   r   r   r   ò   s
    r   c                   @   s   e Zd Zdd„ Zdd„ ZdS )ÚHeterClientc                 C   s   t  |||¡| _d S r   )r   rR   Úheter_client_)r   ZendpointZprevious_endpointr
   r   r   r   r     s   
ÿzHeterClient.__init__c                 C   r/   r   )rS   r-   r$   r   r   r   r-     r1   zHeterClient.stopN)r=   r>   r?   r   r-   r   r   r   r   rR     s    rR   N)Úexecutorr   r@   r   Z1paddle.fluid.incubate.fleet.parameter_server.moder   Ú__all__Úobjectr   r   r   rR   r   r   r   r   Ú<module>   s    ,