o
    MeY                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlmZ d dlmZ d dlmZ eddZdZdZd	Zd
ZG dd dZG dd dZG dd deZG dd deZdS )    N)cloud_utils)launch_utils)
get_loggerINFOELASTICe   f   x   <   c                   @   s   e Zd ZdZdZdS )ElasticLevel      N)__name__
__module____qualname__FAULT_TOLERANCEr    r   r   XD:\Projects\ConvertPro\env\Lib\site-packages\paddle/distributed/fleet/elastic/manager.pyr   ,   s    r   c                   @   s    e Zd ZdZdZdZdZdZdS )ElasticStatus	completederrorZholdZrestartexitN)r   r   r   	COMPLETEDERRORHOLDRESTARTEXITr   r   r   r   r   1   s    r   c                   @   s<   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )LauncherInterfacec                 C   s   || _ g | _d S N)argsprocs)selfr   r   r   r   __init__;   s   
zLauncherInterface.__init__c                 C   s  t jdkr5| jD ]'}|j d u r/t t |jjtj	 |j
r%|j
  td|jj qtd | jD ] }|j d u rX|j  |j
rN|j
  td|jj q8tddD ]+}d}| jD ]}|j d u ryt |jjtj d}qe|std	  dS td q^dS )
Nntzterminate process group gid:{}r   zterminate process id:{}r   2   FTzterminated all the procs)osnamer    procpollZkillpgZgetpgidpidsignalSIGTERMZlog_fncloseloggerinfoformattimesleep	terminaterangekillZSIGKILL)r!   pstepaliver   r   r   _terminate_procs?   s<   








z"LauncherInterface._terminate_procsc                 C   s   d}d }| j D ]0}|j }|d u rd}q|dkr7|tkr&td |  S td td|j| |}q|s@|d u r@dS |S )NFTr   z+return form elastic auto parallel re-launchzABORT!!! ABORT!!! ABORT!!!z<ERROR rank {} error with exit code {}, check log for detail.)	r    r'   r(   ELASTIC_AUTO_PARALLEL_EXIT_CODEr-   r.   r   r/   rank)r!   r7   resultr5   retr   r   r   _check_procs`   s(   




zLauncherInterface._check_procsc                 C      t r   NotImplementedErrorr!   r   r   r   launchu      zLauncherInterface.launchc                 C   r>   r   r?   rA   r   r   r   stopx   rC   zLauncherInterface.stopc                 C   r>   r   r?   rA   r   r   r   watch{   rC   zLauncherInterface.watchN)	r   r   r   r"   r8   r=   rB   rD   rE   r   r   r   r   r   9   s    !r   c                	   @   s   e Zd Zdd Z	d,dedededefdd	Zd-ddZdd Z	defddZ
dd Zdd Zd.defddZdd Zdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ ZdS )/ElasticManagerc              	      s  |_ |jp
td}|jptd}|j\__|j	p$td}|j
p/ttdd}|jp7td}|r<|n _	t|\__ttdt_ttdt d _t rtd	d
_tjd_ttdd_tdd
_tdd
}|d_n2|jptd	d
_jd}	t|	_ttdd_|	jj_fdd|	D _dj	jf _t !dj  t !dj dj  ttdt"j#_$jjksjdkrjdkrt"j#_$t !d jdkrjjkrt"j%_$t !d |s6tdr6tdr6d&tdtd}t 'd&|| g _(d_)d_*d_+d _,|r]d|vs]|r]jsmt !d &||j d_-d S d!_-|_.d"| _/j/d# _0j/d$ _1j/d% _2d
3d&d' t4d(D }
d)&j0|
t55 _6	 j.7j/d* fd+d,}j.8j0|}j.9  fd-d.}t:j;d/|d!d0}|<  j.j7j6t=>jd1 j.7j2t=>d2&jj fd3d4}j.?j2|}||g_@d _Ad S )5NZPADDLE_ELASTIC_SERVERZPADDLE_ELASTIC_JOB_IDZPOD_IPZPADDLE_ELASTIC_SCALEr   ZPADDLE_ELASTIC_FORCEZPADDLE_ELASTIC_TIMEOUTZPADDLE_ELASTIC_TTLPADDLE_TRAINERS ,ZPADDLE_PORTZ6170DISTRIBUTED_TRAINER_ENDPOINTSPADDLE_TRAINER_ENDPOINTSZFLAGS_START_PORTc                    s   g | ]	}d | j f qS %s:%d)
start_port).0iprA   r   r   
<listcomp>   s    z+ElasticManager.__init__.<locals>.<listcomp>rM   zstart job with np=z	trainers=z, trainer_endpoints_list=Z#PADDLE_ELASTIC_FAULT_TOLERANC_LEVELz+start job with ElasticLevel.FAULT_TOLERANCEz#start job with ElasticLevel.ELASTICZ PADDLE_ELASTIC_ETCD_SERVICE_HOSTZ PADDLE_ELASTIC_ETCD_SERVICE_PORTz{}:{}zinit with server {} host {}F:z7Elastic is not enabled with server {} name {} and np {}Tz/paddle/z/nodesz/npz
/endpointsc                 s   s    | ]}t d V  qdS )abcdefghijklmnopqrstuvwxyzN)randomchoice)rO   _r   r   r   	<genexpr>   s    

z*ElasticManager.__init__.<locals>.<genexpr>   z{}/{}{}   0c                    s`   dd  j  jD  _ jrtt jn j _td j d j  d _	d  _
d S )Nc                 S      g | ]	}t |d  qS r   six
ensure_strrO   ir   r   r   rQ          zCElasticManager.__init__.<locals>.host_call_back.<locals>.<listcomp>zhost_call_back curr_host=z, hosts:T)etcd
get_prefixnode_prefixhostslistsetr-   r.   	curr_host	need_syncelastic_startup_time)eventrA   r   r   host_call_back   s   
z/ElasticManager.__init__.<locals>.host_call_backc               
      s   	 zD   dd jjD } | rtt| n| } tdj d|   j| vrDtdj  jj	j
tjd W n tyc } ztd|t  W Y d }~d S d }~ww t d	  q)
NTc                 S   rZ   r[   r\   r_   r   r   r   rQ      ra   zDElasticManager.__init__.<locals>.lease_heartbeat.<locals>.<listcomp>z[lease_heartbeat] curr_host=, hosts=z [lease_heartbeat] register host=leasez&[lease_heartbeat] internal error:{} {}   )refreshrb   rc   rd   rf   rg   r-   r.   rh   put	host_pathr]   b	Exceptionr   r/   	traceback
format_excr0   r1   )re   eZelastic_ttlZ
host_leaser!   r   r   lease_heartbeat   s<   



z0ElasticManager.__init__.<locals>.lease_heartbeatrz   )r&   targetdaemonrn   {}|{}c                    s`    j sd S t j jd pd}|d\ _  _t	d
 j  t	d
 j d S )Nr   rH   |z%set DISTRIBUTED_TRAINER_ENDPOINTS {} zset PADDLE_TRAINERS {} )dist_endpointsr]   r^   rb   getendpoints_pathsplittrainersr-   r.   r/   )rk   ZedpsrA   r   r   endpoints_call_back  s   z4ElasticManager.__init__.<locals>.endpoints_call_back)Br   Zelastic_serverr%   getenvZjob_id	_parse_npnpmin_npmax_nphostscaleintforce	_get_hostr   Zget_device_proc_infoZdevice_modedevices_per_procELASTIC_TIMEOUTelastic_timeoutELASTIC_TTLrN   r   Zuse_paddlecloudr   lenr   r   trainer_endpoints_listips_host_to_endpointsrh   r-   r.   r   r   elastic_levelr   r/   debugre   stoppedsigintri   rj   enablerb   prefixrd   Znp_pathr   joinr3   r0   rs   rr   Zadd_watch_prefix_callbackro   	threadingThreadstartr]   rt   Zadd_watch_callbackwatcheslauncher)r!   r   Zetcd_clientserverr&   r   r   r   Ztrainer_endpointsZnode_ipsZnode_tagrl   Z
host_watchrz   Zkeepalived_threadr   Zendpoints_watchr   ry   r   r"      s   











	

zElasticManager.__init__  ip_port_listr   rN   returnc           
         s   g }|D ]6}| d}t|dkr|d  t|d }n| |}dd t||t| D }| fdd|D  qd|}	|	S )	NrR   r   r   r   c                 S      g | ]}|qS r   r   )rO   xr   r   r   rQ   ;      z5ElasticManager._host_to_endpoints.<locals>.<listcomp>c                    s   g | ]}d  |f qS rL   r   )rO   portrP   r   r   rQ   <  s    rI   )r   r   r   r3   extendr   )
r!   r   r   rN   Zendpoint_listip_port	endpointsr   Zportsr   r   r   r   r   -  s   

z!ElasticManager._host_to_endpointsFc                 C   s   t d| | jr| j  | jsd S |r| j| jd | j	D ]}| j
| q"| j| j dd | j| jD }t|dkrM| j| j d S d S )Nzmanager exist completed {}   1c                 S   r   r   r   r_   r   r   r   rQ   Q  r   z'ElasticManager.exit.<locals>.<listcomp>r   )r-   r.   r/   r   rD   r   rb   rr   r   r   Zcancel_watchdeleters   rc   rd   r   Zdelete_prefix)r!   r   rE   re   r   r   r   r   A  s   

zElasticManager.exitc                 C   s   | j jstd d S td ttj }tj| j j|tj	tj	dd
 \}}|r3td d S td|d   d S )Nzskip pre_hookzexecute pre_hook...T)envstdoutstderrshellzpre_hook exec failedzpre_hook exec result: zutf-8)r   Zelastic_pre_hookr-   r.   copyr%   environ
subprocessPopenPIPEcommunicatewarningdecodestrip)r!   Zcurrent_envouterrr   r   r   pre_hookU  s    


zElasticManager.pre_hookr   c                 C   s   |pt dd}|d}d }}t|dkr+t|d }|dkr#dn|}d}||fS t|dkrQt|d }t|d }|dkrCdn|}||krK|n|}||fS td| d)	z2
        np format is "MIN" or "MIN:MAX" 
        ZPADDLE_ELASTIC_NP0rR   r   r   r   zthe np=z) needs to be in "MIN" or "MIN:MAX" format)r%   r   r   r   r   
ValueError)r!   r   Znp_strZnp_dictr   r   r   r   r   r   e  s"   

zElasticManager._parse_npc                 C   s$   zt t t  W S    Y dS )Nz	127.0.0.1)socketgethostbynamegetfqdngethostnamerA   r   r   r   r   }  s   zElasticManager._get_hostc                 C   s$   | j sdS t| j| jd dkS )NTr   r   )r   r   rb   r   r   rA   r   r   r   
_completed  s   zElasticManager._completedN	host_listc              
   C   s  |r|| _ ndd | j| jD | _ | j rtt| j n| j | _ | jtjkr3t	| j | j
kr1dS dS | jtjkrt	| j }|| j
krEdS | jsMt | _|| jkrWd | _dS || jkr|| jk rt | j }|| jkrtd| d| j d| d| j  dS dS d | _dS dS )	Nc                 S   rZ   r[   r\   r_   r   r   r   rQ     ra   z)ElasticManager._match.<locals>.<listcomp>TFzawait for timeout, you can set value by PADDLE_ELASTIC_TIMEOUT,                         hosts_num=z	, min_np=z(,                         interval_time=z, elastic_timeout=)re   rb   rc   rd   rf   rg   r   r   r   r   r   r   rj   r0   r   r   r   r-   r.   )r!   r   Z	hosts_numZinterval_timer   r   r   _match  sL   




zElasticManager._matchc              	   C   s"   | j | jtd|| d S )Nr}   )rb   rr   r   r]   rt   r/   )r!   r   re   r   r   r   _update_endpoint  s   
zElasticManager._update_endpointc                 C   s   t tdd}td| j d| j  | j| jv r;| jtjd< | jtjd< t	d
| j t	d
| j d S | j| j}|d	krU| j| | j|< | j| j|< nd

|tjd< ddd | jD }|| j_|tjd< d S )NPADDLE_TRAINER_IDzself.curr_host=z, self.dist_endpoints=rJ   rG   z,update env DISTRIBUTED_TRAINER_ENDPOINTS {} zupdate env PADDLE_TRAINERS {} r   {}rI   c                 S      g | ]	}| d d qS rR   r   r   rO   	host_portr   r   r   rQ         z9ElasticManager._update_fault_tolrance.<locals>.<listcomp>)r   r%   r   r-   r   rh   r   r   r   r.   r/   re   indexr   r   r   )r!   r:   idxre   r   r   r   _update_fault_tolrance  s(   z%ElasticManager._update_fault_tolrancec              
   C   s   t | j}tdt| j d| j d| j d|  | jD ]}||vr*|| qd	|
| jtjd< ddd	 |D }|| j_|tjd
< t|| _d|tjd< | jtjd< || _d S )Nzelastic scale out, from  to rm   , host_endpoints=r   r   rI   c                 S   r   r   r   r   r   r   r   rQ     r   z<ElasticManager._update_elastic_scale_out.<locals>.<listcomp>rG   rK   rJ   )r   deepcopyr   r-   r.   r   re   r   appendr/   r   rh   r%   r   r   r   r   r   )r!   host_endpointsZcurr_host_portre   r   r   r   _update_elastic_scale_out  s(   $






z(ElasticManager._update_elastic_scale_outc              
   C   s  t | j}td| j dt| j d| j d|  t }g }t	| jD ]!\}}|
|}|t| jd krB||sB|||< q&|| q&d}g }tt| jD ]}||sjt|dkrj|| ||< |d7 }||| qStd|  || _dd	 |D }	d
|	}
| || j}|
| j_d|
| jtjd< |
tjd< t|| _d
|tjd< |tjd< | ||
 d S )Nzelastic scale in, from r   rm   r   r   r   z#elastic scale in, sorted_endpoints=c                 S   r   r   r   )rO   r   r   r   r   rQ     r   z;ElasticManager._update_elastic_scale_in.<locals>.<listcomp>rI   r   r   rG   rK   rJ   )r   r   r   r-   r.   r   r   re   dict	enumerater   r   r   r3   r   r   r   r   r   r/   rh   r%   r   r   )r!   r   Zendpoints_dictZunsorted_endpointsidr   r   Z
idle_indexZsorted_endpointsZip_listre   Znew_endpointsr   r   r   _update_elastic_scale_in  sD   $








z'ElasticManager._update_elastic_scale_inc                 C   s   t | jdksJ d| jtjkr|   d S t | j| jkr.td| j  |   d S t | j| jkr<| 	  d S | 
  d S )Nr   zhosts emptyzelastic startup, hosts=)r   re   r   r   r   r   r   r-   r.   r   r   rA   r   r   r   _update_hosts  s   zElasticManager._update_hostsc                 C   sl   | j sd S d}| js4|  rtd| j |   d S td| j| j |d7 }t	
d | jr
d S )Nr   zready with hosts {}z!not ready for np {} with hosts {}r   )r   r   r   r-   r.   r/   re   r   r   r0   r1   )r!   r   r   r   r   wait&  s   
	zElasticManager.waitc                 C   s$   | j rd S || j| _| j  d S r   )r   r   r   rB   )r!   r   r   r   r   run6  s   zElasticManager.runc                 C   s   | j rd| _ | jsm| j }td|  |d urRtd| |tkr3td | j	  t
jS |dkr9dnd}| j|d |rFt
jS | jtjkrOt
jS t
jS |  se|  r]| j re| j	  t
jS td | jr	| jru| j	  t
jS )	NFzlauncher.watch():zjob exit with code {}zjob re-launch for auto parallelr   T)r   r   )ri   r   r   rE   r-   r   r.   r/   r9   rD   r   r   r   r   r   r   r   r   r   r   r   r0   r1   r   )r!   r<   r   r   r   r   rE   =  s4   





zElasticManager.watchc                 C   s   | j r|   || _d| _d S )NT)r   r   r   r   )r!   r   framer   r   r   signal_handlerb  s   
zElasticManager.signal_handler)r   )Fr   )r   r   r   r"   rf   r   strr   r   r   r   r   r   r   r   r   r   r   r   r   r   rE   r   r   r   r   r   rF      s8     0

*/%rF   )r0   r   r%   r]   r   loggingr*   rT   r   rv   r   Zpaddle.distributed.fleetr   r   Z"paddle.distributed.utils.log_utilsr   r-   ZELASTIC_EXIT_CODEr9   r   r   r   r   objectr   rF   r   r   r   r   <module>   s.   
F