o
    MeL!                     @   s`   d dl mZmZ ddlmZ ddlZddlZddlZddlZG dd deZ	G dd	 d	e	Z
dS )
   )
ControllerControleMode   )
DeviceType    Nc                   @   s0   e Zd Zedd Zdd Zdd Zdd Zd	S )
CollectiveControllerc                 C   s*   |r|j d| j tj|j_dS dS )N
{} enabledTF)loggerdebugformat__name__r   
COLLECTIVEargsrun_modeclsctx r   `D:\Projects\ConvertPro\env\Lib\site-packages\paddle/distributed/launch/controllers/collective.pyenable   s
   
zCollectiveController.enablec                 C   s:   | j jjd u r| j jjr| j jjr|   d S |   d S )N)r   r   master
start_portips_build_pod_with_args_build_pod_with_masterselfr   r   r   	build_pod$   s   "zCollectiveController.build_podc           	         s      j_t jjj jjjd} fdd|D } jj	
d|  jjj|v r<| jjj jj nd} |  jjj } jjj jjj}t jjD ]}dt|d jjd|| d|dt|d||||  d|| dt|t|d
}t|dkrȈ jjjjtjkr| jjj   jjdkr||d|i n|||| i n|d	d
i d| } j||d q[dS )N,c                    s0   g | ]}t  jjD ]}| d |  q
qS ):)rangepodreplicas).0hpr   r   r   r   
<listcomp>0   s
    z=CollectiveController._build_pod_with_args.<locals>.<listcomp>zjob endpoints: {}r   {})
PADDLE_GLOBAL_SIZEPADDLE_LOCAL_SIZEPADDLE_GLOBAL_RANKPADDLE_LOCAL_RANKPADDLE_NNODESPADDLE_TRAINER_ENDPOINTSPADDLE_CURRENT_ENDPOINTPADDLE_TRAINER_IDPADDLE_TRAINERS_NUMPADDLE_RANK_IN_NODEr   PADDLE_DISTRI_BACKENDgloo
workerlog.Zenvslog_fileT)pod_replicasr!   r"   intr   r   r   r   splitr	   r
   r   nodeipindexsave_pod_logdeviceget_selected_device_keyget_selected_devicesdevicesr    lenjoinstrdtyper   CUSTOM_DEVICEupdateget_custom_device_envsadd_container)	r   r   job_endpointsrank_offsetselected_dev_keyselected_dev_listier7   r   r&   r   r   *   sT   



z)CollectiveController._build_pod_with_argsc                    s      j_t jjj j_ jj } fdd jj	 jjD }t
 jj jj jj jjjjd jjj|d|d} jd jj jj| jj jj\}}| j_t|dk rhdS d	d |D } jjd
|  | tdd |D }tdd |d | D }	 |d d }dd |D }	 j   jjj }
 jjj jjj}t jjD ]~}|d|d jjd|| d|d jjd|	|| d|| d|t |d}t|dkr* jjjjt!j"kr|# jjj$   jjdkr |#|
d|i n|#|
|| i n|#ddi d| } j%||d qdS )Nc                    s   g | ]}d   jjj|qS ){}:{})r   r   r;   r<   )r#   r%   r   r   r   r'   f   s    z?CollectiveController._build_pod_with_master.<locals>.<listcomp>rQ   r   )namerankr"   rF   	candidate	endpointsz/{}/infor   Fc                 S   s   g | ]}t |qS r   )jsonloadsr#   rO   r   r   r   r'   }   s    zsync peers done {}c                 S      g | ]}|d  qS r"   r   rX   r   r   r   r'          c                 S   rY   rZ   r   rX   r   r   r   r'      r[   r   rT   c                 S   rY   )rU   r   rX   r   r   r   r'      r[   r(   )ZPADDLE_MASTERr)   r*   r+   r,   r-   r.   r/   r0   r1   r2   r3   r4   r5   r6   T)&r8   r!   r"   r9   r   r   rS   r;   Zget_free_portZget_free_portsrV   dumpsrR   r?   rF   r   r<   rD   r   Z
sync_peersjobidrC   r	   r
   r>   sumresetr@   rA   rB   r    rE   r   rG   rH   rI   rJ   )r   portrU   dataZ	peer_listrS   Zglobal_sizerL   Zcollective_masterrK   rM   rN   rO   rP   r7   r   r   r   r   ]   st   

	



z+CollectiveController._build_pod_with_masterN)r   
__module____qualname__classmethodr   r   r   r   r   r   r   r   r      s    
	3r   c                   @   s(   e Zd Zedd Zdd Zdd ZdS )CollectiveElasticControllerc                 C   s<   |j jr|j jdr|jd| j tj|j _	dS dS )Nzetcd://r   TF)
r   r   
startswithr	   r
   r   r   r   r   r   r   r   r   r   r      s
   
z"CollectiveElasticController.enablec                 C   s4   | j jdkr| jjd | j| j j| jj d S )Ndefaultz?Using default job name may cause conflict, add --job_id in args)	r]   r^   r   r	   warningr   Zregister_heartbeatr!   rR   r   r   r   r   register   s
   z$CollectiveElasticController.registerc                 C   s  t | jjj}| jjr|n|d }|   | jj| jjj	kru| 
  | jjd | j| jj| jj|\}}|r>|| j_n| jjd| j n+| jjd| j |  sZq| j| jjj |   |  rln	| jj| jjj	ks| jjd| j d S )N
   zWaiting peer ready...zpeer not ready {}zRun {}zJob done {})r9   r   r   Zelastic_timeoutr]   Zelasticrj   r!   ZrestartZmax_restartZ	build_jobr	   infor   Zwait_peer_readyZreplicas_minZreplicas_maxr"   ri   r   r
   r   Z
set_statusstatusRUNNINGZ
deploy_podwatch)r   timeoutokr"   r   r   r   run   s.   
zCollectiveElasticController.runN)r   rc   rd   re   r   rj   rr   r   r   r   r   rf      s
    
rf   )
controllerr   r   Zcontext.devicer   rV   ossixtimer   rf   r   r   r   r   <module>   s    