o
    Me8!                     @   st   d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 g Zdadd Zdd Zd	d
 Zdd ZdS )    N)ProcessManager)core)wait_server_readyc                 C   sf   ddl m} |t| |d}|  d}|dds| s-t| |dds| r|  d S )Nr   )KVServer)size   runningF)	Z*paddle.distributed.fleet.utils.http_serverr   intstartgetZshould_stoptimesleepstop)portZhttp_server_dr   r   Zhttp_serverZwait_seconds r   UD:\Projects\ConvertPro\env\Lib\site-packages\paddle/distributed/parallel_with_gloo.py_start_kv_server   s   
r   c                 C   s   |dk du s
J dt  }| }d|d< | dkr8d|i}ttt|dd ||fd	}d
|_d
|d< |  t|g t	
 }| |_||_|dd |_t|dd |_d|_d|_t	|at  | dkrvd|d< |  dS dS )u/  
    Initialize parallel environment with gloo for cpu only.

    Args:
        - rank_id（int, required) - the index of current rank;
        - rank_num (int, required) - the number of ranks in this parallel env;
        - server_endpoint (str, required) - endpoint of server to init gloo context in ip:port format;

    Returns:
        None

    Examples:
        .. code-block:: python

            import paddle
            import multiprocessing
            from contextlib import closing
            import socket

            port_set = set()

            def find_free_port():
                def _free_port():
                    with closing(socket.socket(socket.AF_INET,
                        socket.SOCK_STREAM)) as s:
                        s.bind(('', 0))
                        return s.getsockname()[1]
                while True:
                    port = _free_port()
                    if port not in port_set:
                        port_set.add(port)
                        return port

            def test_gloo_init(id, rank_num, server_endpoint):
                paddle.distributed.gloo_init_parallel_env(
                    id, rank_num, server_endpoint)

            def test_gloo_init_with_multiprocess(num_of_ranks):
                jobs = []
                server_endpoint = "127.0.0.1:%s" % (find_free_port())
                for id in range(num_of_ranks):
                    p = multiprocessing.Process(
                        target=test_gloo_init,
                        args=(id, num_of_ranks, server_endpoint))
                    jobs.append(p)
                    p.start()
                for proc in jobs:
                    proc.join()

            if __name__ == '__main__':
                # Arg: number of ranks (processes)
                test_gloo_init_with_multiprocess(2)
       FzRrank_num should greater than or equal to 2 for parallel environment initialzation.r	   r   Z_worker:   )targetargsTi  i N)r   dictr   r   r
   splitdaemonr   r   r   ZGlooParallelStrategyZrankrank_num
ip_addressZip_portZinit_secondsZrun_secondsZGlooParallelContext_global_gloo_ctxinitjoin)Zrank_idr   Zserver_endpointmanagerZhttp_server_statusr   Zhttp_server_procZgloo_strategyr   r   r   gloo_init_parallel_env(   s<   7

r"   c                   C   s   t dusJ dt   dS )a  
    Call barrier function with initialized gloo context.

    Args:
        None

    Returns:
        None

    Examples:
        .. code-block:: python

            import paddle
            import multiprocessing
            from contextlib import closing
            import socket

            port_set = set()

            def find_free_port():
                def _free_port():
                    with closing(socket.socket(socket.AF_INET,
                        socket.SOCK_STREAM)) as s:
                        s.bind(('', 0))
                        return s.getsockname()[1]
                while True:
                    port = _free_port()
                    if port not in port_set:
                        port_set.add(port)
                        return port

            def test_gloo_barrier(id, rank_num, server_endpoint):
                paddle.distributed.gloo_init_parallel_env(
                    id, rank_num, server_endpoint)
                paddle.distributed.gloo_barrier()

            def test_gloo_barrier_with_multiprocess(num_of_ranks):
                jobs = []
                server_endpoint = "127.0.0.1:%s" % (find_free_port())
                for id in range(num_of_ranks):
                    p = multiprocessing.Process(
                        target=test_gloo_barrier,
                        args=(id, num_of_ranks, server_endpoint))
                    jobs.append(p)
                    p.start()
                for proc in jobs:
                    proc.join()

            if __name__ == '__main__':
                # Arg: number of ranks (processes)
                test_gloo_barrier_with_multiprocess(2)
    Nzgloo context is not initialzed.)r   Zbarrierr   r   r   r   gloo_barrier   s   6r#   c                   C   s   t dur
t   dS dS )a  
    Release the parallel environment initialized by gloo

    Args:
        None

    Returns:
        None

    Examples:
        .. code-block:: python

            import paddle
            import multiprocessing
            from contextlib import closing
            import socket

            port_set = set()

            def find_free_port():
                def _free_port():
                    with closing(socket.socket(socket.AF_INET,
                        socket.SOCK_STREAM)) as s:
                        s.bind(('', 0))
                        return s.getsockname()[1]
                while True:
                    port = _free_port()
                    if port not in port_set:
                        port_set.add(port)
                        return port

            def test_gloo_release(id, rank_num, server_endpoint):
                paddle.distributed.gloo_init_parallel_env(
                    id, rank_num, server_endpoint)
                paddle.distributed.gloo_barrier()
                paddle.distributed.gloo_release()

            def test_gloo_release_with_multiprocess(num_of_ranks):
                jobs = []
                server_endpoint = "127.0.0.1:%s" % (find_free_port())
                for id in range(num_of_ranks):
                    p = multiprocessing.Process(
                        target=test_gloo_release,
                        args=(id, num_of_ranks, server_endpoint))
                    jobs.append(p)
                    p.start()
                for proc in jobs:
                    proc.join()

            if __name__ == '__main__':
                # Arg: number of ranks (processes)
                test_gloo_release_with_multiprocess(2)
    N)r   releaser   r   r   r   gloo_release   s   7r%   )ossysr   warningsmultiprocessingr   r   Zpaddle.fluidr   Z5paddle.distributed.fleet.base.private_helper_functionr   __all__r   r   r"   r#   r%   r   r   r   r   <module>   s   
_: