o
    Meyj  ã                   @   sœ   d Z ddlmZ g d¢ZG dd„ deƒZG dd„ deƒZG dd	„ d	eƒZG d
d„ deƒZG dd„ deƒZ	G dd„ deƒZ
G dd„ deƒZG dd„ deƒZdS )zDefination of device workers.é    )Úprint_function)ÚDeviceWorkerÚHogwildÚDownpourSGDÚSectionÚDownpourSGDOPTÚHeterSectionc                   @   s:   e Zd ZdZdd„ Zddd„Zdd„ Zd	d
„ Zdd„ ZdS )r   z×
    DeviceWorker is an abstract class, which generates worker desc.
    This class is an inner class that we do computation logics within
    the implementation. For example, execution of a program or a graph.
    c                 C   s   d| _ d| _dS ©zInit.N)Ú_programÚ_infer©Úself© r   úJD:\Projects\ConvertPro\env\Lib\site-packages\paddle/fluid/device_worker.pyÚ__init__   s   
zDeviceWorker.__init__Fc                 C   ó
   || _ dS )z~
        set inference flag for current device worker

        Args:
            infer(bool): whether to do inference
        N)r   )r   Zinferr   r   r   Ú
_set_infer$   ó   
zDeviceWorker._set_inferc                 C   r   )zn
        Set fleet desc.

        Args:
            fleet_desc(PSParameter): pslib.PSParameter object
        N)Ú_fleet_desc)r   Ú
fleet_descr   r   r   Ú_set_fleet_desc-   r   zDeviceWorker._set_fleet_descc                 C   r   )z\
        Set program.

        Args:
            program(Program): a Program object
        N)r
   )r   Úprogramr   r   r   Ú_set_program6   r   zDeviceWorker._set_programc                 C   s   t dƒ‚)zs
        Generator worker desc.

        Args:
            trainer_desc(TrainerDesc): a TrainerDesc object
        zXDeviceWorker does not implement gen_worker_desc, please use Hogwild or DownpourSGD, etc.)ÚNotImplementedError)r   Útrainer_descr   r   r   Ú_gen_worker_desc?   s   ÿzDeviceWorker._gen_worker_descN)F)	Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r   r   r   r   r   r   r   r      s    
			r   c                       ó(   e Zd ZdZ‡ fdd„Zdd„ Z‡  ZS )r   z.
    Hogwild is a kind of SGD algorithm.

    c                    ó   t t| ƒ ¡  dS r	   )Úsuperr   r   r   ©Ú	__class__r   r   r   Q   ó   zHogwild.__init__c                 C   sÔ  d|_ | jr|jj g d¢¡ tƒ }tt| jƒƒ}t	d|ƒ | jdkr+t	dƒ t
dƒ | jj}|s3dS |j}|j}|d rR|d D ]}|j |g¡ |j |g¡ qAdd	lm} | ¡ rfd
|vrfd|vrfdS |d }	t	d|	ƒ |	D ]i}
t	d|
|ƒ |
|krÚ|j ¡ }||_t	d|	| d ƒ |	| d D ]	}|j |g¡ q”|	| d D ]}|j |g¡ | |¡ q¤|	| d D ]	}|j |g¡ q¹|	| d D ]}|j |g¡ | |¡ qÉ nqqd|_ |j}|j|_| d¡du r÷| d¡du r÷tdƒ‚| d¡dur¸|d }| |¡du rtd| ƒ‚|d | }| ¡ jD ]}|j|v r4|j ¡ }|j  |j!¡ |j|_qt"| ¡ j#ƒ}t$|ƒD ]F}|j# ¡ }| ¡ j#| j|_|j% | ¡ j#| j&¡ |j' | ¡ j#| j(¡ |j) | ¡ j#| j*¡ | j+j,j-j.| j/j0|_0d|_1d|_2qA| ¡ jD ] }|j|v r¬|j ¡ }|j|_|j  |j!¡ |j3 |j4¡ q|j | ¡ j5¡ n"| d¡}t	d|ƒ | 6¡ D ]\}}|j ¡ }|j  |¡ ||_qÆ| jrè|j g d¢¡ dS dS )z™
        Generator worker desc, which device worker is HogwildWorker.

        Args:
            trainer_desc(TrainerDesc): a TrainerDesc object
        ZHogwildWorker©ÚfeedÚpush_sparseÚpush_sparse_v2Ú
push_denseZdistributed_push_sparseÚsendúdevice worker program id:Nú2program of current device worker is not configuredéÿÿÿÿÚstat_var_namesr   ©Úversionr   Úprogram_configsúdevice worker program_configs:údevice workerúdevice worker pull dense:Ú
pull_denser(   r*   Úpull_sparseÚprogram_id_to_workerÚdense_table_configú=opt_info must have program_id_to_worker or dense_table_configú$%s not found in program_id_to_workerÚ ú!device worker dense_table_config:©r(   r)   r*   )7Údevice_worker_namer   Zhogwild_paramÚskip_opsÚextendÚsetÚstrÚidr
   ÚprintÚexitÚ
_fleet_optÚdownpour_paramr/   Ú,paddle.fluid.incubate.fleet.parameter_serverr1   Úis_transpilerÚprogram_configÚaddÚ
program_idÚpush_sparse_table_idÚpush_dense_table_idÚpull_sparse_table_idÚpull_dense_table_idÚpull_dense_paramÚ
thread_numÚ
device_numÚgetÚ
ValueErrorÚget_descÚdense_tableÚtable_idÚdense_value_nameÚdense_variable_nameÚlenÚsparse_tableÚrangeÚsparse_key_nameÚslot_keyÚsparse_value_nameÚ
slot_valueÚsparse_grad_nameÚslot_gradientr   Úserver_paramÚdownpour_server_paramÚdownpour_table_paramÚaccessorÚfea_dimÚemb_dimÚlabel_var_nameÚdense_grad_nameÚdense_gradient_variable_nameÚskip_opÚitems)r   r   Údense_table_setrM   Úopt_infoÚdownpourZhogwildÚir1   r2   ÚpidÚpcÚpull_threadÚprog_id_to_workerÚworkerrX   Ú
sparse_lenr]   r9   rY   Úvarnamesr   r   r   r   U   sä   




ÿñÿÿÿÿ
ÿ€
ÿÿÿÿÿ
ÿþ

ÿ€



ÿÿzHogwild._gen_worker_desc©r   r   r   r   r   r   Ú__classcell__r   r   r#   r   r   K   ó    r   c                       r    )ÚDownpourLitez3
    DownpourLite is a kind of SGD algorithm.

    c                    r!   r	   )r"   r~   r   r   r#   r   r   r   Ò   r%   zDownpourLite.__init__c                 C   sÂ  t dƒ d|_| jr|jj g d¢¡ tƒ }tt| j	ƒƒ}t d|ƒ | j	dkr/t dƒ t
dƒ | j	j}|s7dS |j}|d rL|d D ]	}|j |g¡ qBd	d
lm} | ¡ r`d|vr`d|vr`dS |d }t d|ƒ |D ]i}	t d|	|ƒ |	|krÔ|j ¡ }
||
_t d|| d ƒ || d D ]	}|
j |g¡ qŽ|| d D ]}|
j |g¡ | |¡ qž|| d D ]	}|
j |g¡ q³|| d D ]}|
j |g¡ | |¡ qÃ nqk|j}|j|_| d¡du rî| d¡du rîtdƒ‚| d¡dur¯|d }| |¡du rtd| ƒ‚|d | }| ¡ jD ]}|j|v r+|j ¡ }|j |j ¡ |j|_qt!| ¡ j"ƒ}t#|ƒD ]F}|j" ¡ }| ¡ j"| j|_|j$ | ¡ j"| j%¡ |j& | ¡ j"| j'¡ |j( | ¡ j"| j)¡ | j*j+j,j-| j.j/|_/d|_0d|_1q8| ¡ jD ] }|j|v r£|j ¡ }|j|_|j |j ¡ |j2 |j3¡ q„|j | ¡ j4¡ n"| d¡}t d|ƒ | 5¡ D ]\}}|j ¡ }|j |¡ ||_q½| jrß|j g d¢¡ dS dS )zž
        Generator worker desc, which device worker is DownpourLiteWorker.

        Args:
            trainer_desc(TrainerDesc): a TrainerDesc object
        zcreate DownpourLiteWorkerZDownpourLiteWorkerr&   r,   Nr-   r.   r/   r   r0   r   r2   r3   r4   r5   r6   r(   r*   r7   r8   r9   r:   r;   r<   r=   r>   )6rE   r?   r   rH   r@   rA   rB   rC   rD   r
   rF   rG   r/   rI   r1   rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rS   rT   rU   rV   rW   rX   rY   rZ   r[   r\   r]   r^   r_   r`   ra   rb   rc   rd   r   re   rf   rg   rh   ri   rj   rk   rl   rm   rn   ro   )r   r   rp   rM   rq   rr   rs   r1   r2   rt   ru   rv   rw   rx   rX   ry   r]   r9   rY   rz   r   r   r   r   Ö   sà   




ÿñÿÿÿÿ
ÿ€
ÿÿÿÿÿ
ÿþ

ÿ€



ÿÿzDownpourLite._gen_worker_descr{   r   r   r#   r   r~   Ì   r}   r~   c                       r    )r   z=
    DownpourSGD is a kind of distributed SGD algorithm.
    c                    r!   )zD
        Init.
        initialize downpourSGD device worker
        N)r"   r   r   r   r#   r   r   r   P  ó   zDownpourSGD.__init__c                 C   sh  t ƒ }tt| jƒƒ}| jdkrtdƒ tdƒ | jj}|d }|j}|D ]z}||krž|j 	¡ }||_
|| d D ]	}	|j |	g¡ q8|| d D ]}	|j |	g¡ | 	|	¡ qH|| d D ]	}	|j |	g¡ q]|| d D ]}	|j |	g¡ | 	|	¡ qmd	|| v rœ|| d	 }
|
 ¡ D ]\}}|j 	¡ }||_||_qŒ nq$| d
d¡|_|j}|j|_| d¡du r¸tdƒ‚|d }| |¡du rÉtd| ƒ‚|d | }| ¡ jD ]}	|	j|v rë|j 	¡ }|j |	j¡ |	j|_qÔt| ¡ j ƒ}t!|ƒD ]k}	|j  	¡ }| ¡ j |	 j|_|j" | ¡ j |	 j#¡ |j$ | ¡ j |	 j%¡ |j& | ¡ j |	 j'¡ |d s<d|v rL|d dkrL| j(j)j*j+|	 j,j-|_.|j.|_-n| j(j)j*j+|	 j,j-d |_.|j.d |_-d|_/q÷|d rw|d D ]
}	|j0 |	g¡ ql| ¡ jD ] }	|	j|v r›|j 	¡ }|	j|_|j |	j¡ |j1 |	j2¡ q||j3 | ¡ j4¡ | j5r²d|_6d|_7dS dS )úš
        Generator worker desc, which device worker is DownpourWorker.

        Args:
            trainer_desc(TrainerDesc): a TrainerDesc object
        Nr-   r.   r2   r(   r*   r7   r6   Úcond2denseidZworker_classZDownpourWorkerr8   ú'opt_info must have program_id_to_workerr;   Úuse_cvmÚno_cvmTé   Úclickr/   F)8rB   rC   rD   r
   rE   rF   rG   rH   rK   rL   rM   rN   rA   rO   rP   rQ   ro   Zpartial_pushdense_condtable_mapÚkeyÚvaluerU   r?   rR   rS   rT   rV   rW   rX   rY   rZ   r[   r\   r]   r^   r_   r`   ra   rb   rc   rd   r   re   rf   rg   rh   ri   rj   rk   r/   rl   rm   r@   rn   r   r*   r(   )r   r   rp   rM   rq   r2   rr   rt   ru   rs   r   r‡   rˆ   Zmc_maprv   rw   rx   rX   ry   r]   r   r   r   r   W  sÆ   


ìÿÿ

ÿ€
ÿÿÿÿÿ
ÿþ

ÿÿÿ

ÿ€
þzDownpourSGD._gen_worker_descr{   r   r   r#   r   r   K  ó    r   c                       r    )r   z@
    DownpourSGDOPT is a kind of distributed SGD algorithm.
    c                    r!   )zG
        Init.
        initialize downpourSGDOPT device worker
        N)r"   r   r   r   r#   r   r   r   ¼  r   zDownpourSGDOPT.__init__c                 C   s`  t ƒ }tt| jƒƒ}| jdkrtdƒ tdƒ | jj}|d }|j}|D ]Z}||kr~|j 	¡ }||_
|| d D ]	}	|j |	g¡ q8|| d D ]}	|j |	g¡ | 	|	¡ qH|| d D ]	}	|j |	g¡ q]|| d D ]}	|j |	g¡ | 	|	¡ qm nq$d	|_|j}
|j|
_| d
¡du r”tdƒ‚|d
 }| |¡du r¥td| ƒ‚|d
 | }| ¡ jD ]}	|	j|v rÇ|
j 	¡ }|j |	j¡ |	j|_q°t| ¡ jƒ}t|ƒD ]k}	|j 	¡ }| ¡ j|	 j|_|j | ¡ j|	 j¡ |j  | ¡ j|	 j!¡ |j" | ¡ j|	 j#¡ |d sd|v r(|d dkr(| j$j%j&j'|	 j(j)|_*|j*|_)n| j$j%j&j'|	 j(j)d |_*|j*d |_)d|_+qÓd|v rO|j|d v rOd|_,d|v r_|j|d v r_d|_-|d rs|d D ]
}	|j. |	g¡ qh| ¡ jD ] }	|	j|v r—|j 	¡ }|	j|_|j |	j¡ |j/ |	j0¡ qx|j1 | ¡ j2¡ | j3r®d|_4d|_5dS dS )r€   Nr-   r.   r2   r(   r*   r7   r6   ZDownpourWorkerOptr8   r‚   r;   rƒ   r„   Tr…   r†   Zlocal_tablesZasync_tablesr/   F)6rB   rC   rD   r
   rE   rF   rG   rH   rK   rL   rM   rN   rA   rO   rP   rQ   r?   rR   rS   rT   rU   rV   rW   rX   rY   rZ   r[   r\   r]   r^   r_   r`   ra   rb   rc   rd   r   re   rf   rg   rh   ri   rj   rk   Úis_localÚis_asyncr/   rl   rm   r@   rn   r   r*   r(   )r   r   rp   rM   rq   r2   rr   rt   ru   rs   rv   rw   rx   rX   ry   r]   r   r   r   r   Ã  sÆ   

óÿ

ÿ€
ÿÿÿÿÿ
ÿþ

ÿÿÿÿÿ

ÿ€
þzDownpourSGDOPT._gen_worker_descr{   r   r   r#   r   r   ·  r‰   r   c                       r    )r   zSectionWorker.c                    r!   r	   )r"   r   r   r   r#   r   r   r   $  r%   zSection.__init__c                 C   sø   ddl m} ddlm} d|_| jj}|j}|d |_|d |_	|d |_
|d	 |_|d
 }|dv s6J dƒ‚|dkr<dnd}||_|j}|d }	|j |	 ¡  ¡ ¡ |d }
|d }| ¡ rgt|
|jƒsfJ ‚n| ¡ rst|
|jƒssJ ‚|j|_||_dS )z˜
        Generator worker desc, which device worker is SectionWorker.
        Args:
            trainer_desc(TrainerDesc): a TrainerDesc object
        r   ©Útext_formaté   ©ÚcoreZSectionWorkerÚnum_microbatchesÚstart_cpu_core_idÚpipeline_stageÚnum_pipeline_stagesÚschedule_mode)úF-then-BZ1F1Bz>The schedule mode for pipeline must be one of F-then-B or 1F1Br–   Úsection_programÚplaceÚplace_idN)Úgoogle.protobufr   r<   r   r?   r
   Z_pipeline_optÚsection_paramr‘   r’   r“   r”   r•   Úsection_configÚprogram_descÚParseFromStringÚ	_get_descÚserialize_to_stringZis_compiled_with_cudaÚ
isinstanceZ	CUDAPlaceZis_compiled_with_npuZNPUPlacer˜   r™   )r   r   r   r   Zpipeline_optr›   Zschedule_mode_strr•   Úcfgr   r˜   r™   r   r   r   r   (  s8   




þ
ÿ
zSection._gen_worker_descr{   r   r   r#   r   r   !  ó    r   c                       r    )r   zHeterSectionWorker.c                    r!   r	   )r"   r   r   r   r#   r   r   r   U  r%   zHeterSection.__init__c                 C   sp   ddl m} ddlm} d|_| jj}|j}|d |_|d |_	|d |_
|j}|d	 }|j | ¡  ¡ ¡ d
S )z
        Generator worker desc, which device worker is HeterSectionWorker.
        Args:
            trainer_desc(TrainerDesc): a TrainerDesc object
        r   rŒ   rŽ   r   ZHeterSectionWorkerr‘   r“   r”   r—   N)rš   r   r<   r   r?   r
   Z_heter_pipeline_optÚheter_section_paramr‘   r“   r”   rœ   r   rž   rŸ   r    )r   r   r   r   Zheter_pipeline_optr¤   r¢   r   r   r   r   r   Y  s&   ÿÿÿ
ÿzHeterSection._gen_worker_descr{   r   r   r#   r   r   R  r£   r   c                   @   s   e Zd Zdd„ ZdS )ÚDeviceWorkerFactoryc                 C   s   |  ¡ }tƒ | ƒ S )N)Ú
capitalizeÚglobals)r   Zworker_typeÚ	classnamer   r   r   Ú_create_device_workerr  s   z)DeviceWorkerFactory._create_device_workerN)r   r   r   r©   r   r   r   r   r¥   p  s    r¥   N)r   Ú
__future__r   Ú__all__Úobjectr   r   r~   r   r   r   r   r¥   r   r   r   r   Ú<module>   s   3 lj1