o
    MeR                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlm  mZ d dlmZ d dlmZ d dlmZ d dlmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlm Z  ddlm!Z! ddlm"Z"m#Z# ddl$m%Z% ddl$m&Z& ddl$m'Z' ddl(m)Z) ddl*m+Z+ ddl,m-Z- ddl.m/Z/ ddl0m1Z1 ddl2m3Z3 eej4Z5G dd dZ6dS )    N)program_guard)append_backward)
get_logger)new_passPassContext   )DistributedContext)set_default_distributed_context)	Completer)Partitioner)get_all_process_groups)get_process_group)get_world_process_group)_g_process_group_mapProcessGroup)make_data_unshard)set_grad_var_shape)SerialProgramInfo)	Resharder)Cluster)mapping)DistributedOperator)DistributedTensor)Plannerc                   @   sb   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dddZ
			dddZdd ZdS )AutoParallelizera/  
    AutoParallelizer is the main controller class to do the auto parallel process.
    And the auto parallel process will be triggered in the wrapped parallelize function.
    To facilitate the auto parallelization, it will contain information about program, cluster and the
    related context. In this basic version, the program information will be retrevied from 
    Fleet object, and the cluster information can be retrevied in the new created Cluster object,
    and the context information can be retrevied in the new created DistributedContext. 
    c                 C   s   || _ | j j| _| j j| _t | _d | _t	dd | _
| j
d ur+t | _| j| j
 t	dd | _t	dd }|d u r@d| _nd| _t | _t	d| _| jr\| j dkr\d| _d S d| _d S )NZPADDLE_CLUSTER_TOPO_PATHZPADDLE_RANK_MAPPING_PATHZPADDLE_ENABLE_AUTO_MAPPINGFTZPADDLE_NEED_RANK_MAPPINGtrue)_fleetZuser_defined_optimizer
_optimizerZ_user_defined_strategy_dist_strategyr   _dist_context_clusterosgetenvZ_cluster_topo_pathr   Zbuild_from_file_rank_mapping_path_enable_auto_mappingr   _pass_context_need_rank_mappinglower)selffleetZenable_auto_mapping_env r*   ]D:\Projects\ConvertPro\env\Lib\site-packages\paddle/distributed/auto_parallel/parallelizer.py__init__>   s*   



zAutoParallelizer.__init__c                 C   sB   t  }|jD ]}|jD ]}|jD ]}||v r|| qqqd S N)coreZkAutoParallelSuffixblocksopsZ
attr_namesZ_remove_attr)r(   main_programsuffixblockopZ	attr_namer*   r*   r+   _remove_distributed_attrsV   s   



z*AutoParallelizer._remove_distributed_attrsc           
      C   s   | j jr@t| j j}| j|d< ||d< ||d< |d r1| j|d< td|}||g|g| j	 ntd|}||g|g| j	 | j j
rlt| j j}| j|d< t||d< ||d< td	|}	|	|g|g| j	 d S d S )
Ndist_contextparams_gradslossZuse_pure_fp16Zbase_optZauto_parallel_fp16Zauto_parallel_ampno_grad_setZauto_parallel_recompute)r   ampcopydeepcopyZamp_configsr   r   r   applyr%   Z	recomputeZrecompute_configs)
r(   r1   startup_programr8   r7   r9   configZauto_parallel_fp16_passZauto_parallel_amp_passZauto_parallel_recompute_passr*   r*   r+   _apply_pre_optimization_passes`   s8   




z/AutoParallelizer._apply_pre_optimization_passesc                 C   sj   t || t||||| jjd}W d    n1 sw   Y  t| j| _| j| | jj| |S )N)Zdistop_context)	r   r   r   Zdist_op_contextr
   
_completerZcomplete_backward_annotationblock_stateZparse_backward_blocks)r(   r1   r>   r8   parameter_listr9   	callbacksr7   r*   r*   r+   _generate_backward~   s   z#AutoParallelizer._generate_backwardc                 C   sd   t | j}t|| ||}W d    n1 sw   Y  || j_t| j| _| j	| |S r-   )
r;   r<   r   r   Zapply_gradientsr   Z_serial_optimizerr
   rA   Zcomplete_update_annotation)r(   r1   r>   r7   Z	optimizerZoptimize_opsr*   r*   r+   _apply_optimize   s   z AutoParallelizer._apply_optimizec           	      C   s   | j jr-t| j j}| j|d< ||d< ||d< td|}||g|g| j | j	d}t| j j}| j|d< ||d< ||d< td|}||g|g| j | j j
rut| j j}| j|d< ||d< td|}||g|g| j d S d S )Nr6   r7   Zglobal_rankZauto_parallel_shardingZrank_idZauto_parallel_grad_clip!auto_parallel_gradient_merge_pass)r   Zshardingr;   r<   Zsharding_configsr   r   r=   r%   Zget_attrZgradient_mergeZgradient_merge_configs)	r(   r1   r>   rankr7   r?   Zauto_parallel_sharding_passZauto_parallel_clip_passrG   r*   r*   r+   _apply_post_optimization_passes   s@   



z0AutoParallelizer._apply_post_optimization_passesNFc                 C   sp  d }| j  }| j }| | jj}|d u r/t | _t	
d t| j| _| j|}n|}t|| _| jj| | |||| j| j| j}| ||||| j t| j|}	|	|||\}
}}| |
||}t|
| j t|
|| j t|
||| j|}|  | |
||| d }|stt}t   t!dg td< | jj"D ]
}td #|j$ q||||
|fS )NzStart annotation dist attr.r   )%_main_programclone_startup_programglobal_blockvar_lossnamer   r   _loggerinfor
   rA   Zcomplete_forward_annotationr;   r<   rB   Zparse_forward_blocksrE   _parameter_list_no_grad_set
_callbacksr@   r   	partitionrF   r   r   r   reshardrI   r   clearr   _process_meshes	add_ranks	processes)r(   rH   r6   relaunch_phaseZcompleted_main_programZserial_main_programZserial_startup_programZserial_lossr7   partitionerdist_main_progdist_startup_progdist_params_gradsdist_optimize_opsZ	resharderg_process_group_mapprocess_meshr*   r*   r+   _get_dist_program   s^   




z"AutoParallelizer._get_dist_programc           .      C   sH  |d usJ || _ || _|jj| _|| _|| _|| _| jr| j	r| j
d us+J di }t }d }| jjr]td t| j| j| j | j| j
}	t|	| dddd}
|
 \}}td |d urtd t  }tj|d	t  d
}i }i }i }|j D ]	\}}|j||< q|j D ]	\}}|j||< q||d< ||d< |j|d< t |d}t!"|| |tj#d< td|  W d    n1 sw   Y  |j$D ]}| %||\}}}}}||g||< qt&|| j
}t'|( }t | j)d}t*"|| W d    n	1 sw   Y  t+d}|r#|, dkr#dnd}|r6t-d t./t0j1j2j3j4j5 t+d}dd| j)g} tj#6dddkrRg d}!ng }!d|  d | }"t.j7dg|! t89|" }#t:;|#}$|$<  |$j=d ks{J d!t-d" t./d  d S t0j1> }d }t+dd }|d urt |d#S}t!?|}t@ }| jA jBD ]}%|d |%jCD  }&tE|%|&}|F| q| jA jG}'|'( D ]}(|d |(jCD  }&tH|(|&}|I| q|d |_W d    n	1 sw   Y  n#| jjrt| j| j| j | j| j
d$}	t|	| dddd}
|
 \}}|d ur3tJd })|jD ]	}*|)K|*jL q)| j%||dd%\}}}}}| jjrd}+|A jBD ]}%|%jMd&ksZ|%jMd'kr^d}+ nqL|+rt0jNO| t0j1P  W d    n	1 szw   Y  tQ },|,D ]}-||-j$vrq|-R  qtS| jT | U| ||||fS )(Nz5The cluster must not be none when using auto mapping.zStart searching dist attr.Zmcmc   )rP   Zmax_search_times)Zalgorithm_configzEnd searching dist attr.z"Start serialize searched dist attrZsearched_dist_context_z.pklops_dist_attrtensors_dist_attrZprocess_mesheswbZ!PADDLE_SEARCHED_DIST_CONTEXT_PATHz$End serialize searched dist attr to wZPADDLE_ENABLE_ELASTICr   TFz/Auto mapping finished, now do elastic re-launchZPADDLE_ORIGINAL_CMD_ARGS z--rank_mapping_pathZWITH_COVERAGEZOFFON)z-mZcoveragerunz--branchz-pz#-m paddle.distributed.fleet.launch z-ur   zLaunch failed with rank mappingz3Successfully do the second launch for auto mapping!rb)cluster)r\   Zsend_v2Zrecv_v2)VrO   rL   r3   programrJ   rS   rT   rU   r$   r&   r    r   r   Zauto_searchloggingrR   r   r   r   searchpathlibPathresolver!   pathjointimeZ_dist_ops_for_programitems	dist_attrZ_dist_tensors_for_programrY   openpickledumpenvironZranksrd   r   listvaluesr#   jsonr"   r'   printsysexitpaddledistributedr)   ZelasticmanagerZELASTIC_AUTO_PARALLEL_EXIT_CODEget
executableshlexsplit
subprocessPopenwait
returncodeZget_rankloadr   rM   r0   descidr   Zadd_dist_op_for_programvarsr   Zadd_dist_tensor_for_programr   rZ   r[   typeZstaticr   Zbarrierr   Zinstantiater	   r   r5   ).r(   r8   r>   rC   r9   rD   Zdist_programsZworld_process_groupr6   Zserial_program_infoplanner_cwdZsearched_dist_context_pathZsaved_dist_contextrf   rg   keydist_opdist_tensorZdist_context_filerH   ra   r`   r_   r^   rb   Zrank_mapping_dictZrank_mappingZrank_mapping_fileZenable_elasticZoriginal_cmd_argsZrank_mapping_argsZcoverage_argsZnew_cmd_argsZnew_cmdZnew_processr4   ry   r   rN   Zpg0rc   Zis_pipelineZall_process_groupsprocess_groupr*   r*   r+   parallelize   s\  




	















zAutoParallelizer.parallelizec                 C   s~   | j }||}||t| < | j D ])\}}|dks+|dks+|dks+|dks+|dkr2t||| qt||t|| q|S )NrJ   rL   r   r   rO   )	__class____new__r   __dict__rx   setattrr;   r<   )r(   memoclsresultkvr*   r*   r+   __deepcopy__  s   
(zAutoParallelizer.__deepcopy__)NF)NNN)__name__
__module____qualname____doc__r,   r5   r@   rE   rF   rI   rd   r   r   r*   r*   r*   r+   r   4   s    	

 ?
 6r   )7r!   r   r   r   r;   rr   r   rp   r{   rw   r   Zpaddle.fluid.coreZfluidr.   Zpaddle.fluidr   Zpaddle.fluid.backwardr   Z"paddle.distributed.utils.log_utilsr   Zpaddle.distributed.passesr   r   r6   r   r	   
completionr
   r]   r   r   r   r   r   r   r   utilsr   r   r   rW   r   rn   r   Zmapperr   r   r   r   r   r   r   INFOrQ   r   r*   r*   r*   r+   <module>   sF   
