o
    Ne                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z
d dlZd dlmZ d dlmZmZmZ d dlZd dlZd dlmZ d dlmZ ddlmZmZ ddlmZmZmZ ddlm Z m!Z!m"Z" d	d
l#m$Z$m%Z% d	dl&m'Z' d	dl(m)Z)m*Z* d	dl+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2 d	dl3m4Z4m5Z5 d dl6m7Z7 dgZ8da9dd Z:e";e: G dd de<Z=G dd de=Z>G dd de=Z?dS )    N)
namedtuple)_set_expected_place_current_expected_place	set_flags)in_profiler_mode   )corelayers)_non_static_modein_dygraph_mode_in_legacy_dygraph)_set_SIGCHLD_handlerMP_STATUS_CHECK_INTERVALCleanupFuncRegistrar   )_IterableDatasetFetcher_MapDatasetFetcher)_InfiniteIterableSampler)default_collate_fndefault_convert_fn)ParentWatchDogget_worker_info_worker_loop_DatasetKind_IterableDatasetStopIteration_WorkerException_ResumeIteration)_flatten_batch_restore_batch)	benchmarkr   c                   C   s*   t d urzt   b W d S    Y d S d S N)_loader__del__ r#   r#   WD:\Projects\ConvertPro\env\Lib\site-packages\paddle/fluid/dataloader/dataloader_iter.py_clear_loaderC   s   r%   c                   @   sD   e Zd ZdZdd Zedd Zdd Zdd	 Zd
d Z	dd Z
dS )_DataLoaderIterBasez
    Iterator implement of DataLoader, will load and feed mini-batch
    data by setting in given dataloader.

    Args:
        loader(instance of DataLoader): instance of `fluid.io.DataLoader`
    c                 C   s   |j | _|jpg | _|j| _|j| _|j| _	|j
| _|j| _|j| _|j| _|j| _|j| _|jdkr6|jnt| _|j| _|j| _|j| _t| j | _!| jrU|j"pRt#| _$n|j"pYt%| _$d | _&d | _'t() | _*d S )Nr   )+Zdataset_datasetZ	feed_list
_feed_listZplaces_placesZreturn_list_return_listbatch_sampler_batch_samplerZ	drop_last
_drop_lastZauto_collate_batch_auto_collate_batchZnum_workers_num_workersZuse_buffer_reader_use_buffer_readerZprefetch_factor_prefetch_factorZuse_shared_memory_use_shared_memorytimeoutr   _timeoutZworker_init_fn_worker_init_fnZdataset_kind_dataset_kindZ
pin_memory_pin_memoryiter_index_sampler_sampler_iterZ
collate_fnr   _collate_fnr   _blocking_queue_thread	threadingEvent_thread_done_eventselfloaderr#   r#   r$   __init__Y   s,   z_DataLoaderIterBase.__init__c                 C   s6   | j r| jS | jtjkrttt| jS t	| jdS Nr   )
r.   r,   r6   r   ZMAPlistrangelenr'   r   rB   r#   r#   r$   r9   y   s
   z"_DataLoaderIterBase._index_samplerc                 C   s   | S r    r#   rI   r#   r#   r$   __iter__   s   z_DataLoaderIterBase.__iter__c                 C   s
   t | jS r    )rH   r,   rI   r#   r#   r$   __len__   s   
z_DataLoaderIterBase.__len__c                 C   "   | j   | jr| j  d S d S r    )r@   setr<   closerI   r#   r#   r$   _exit_thread_expectedly      
z+_DataLoaderIterBase._exit_thread_expectedlyc                 C   rL   r    )r@   rM   r<   killrI   r#   r#   r$   _exit_thread_unexpectedly   rP   z-_DataLoaderIterBase._exit_thread_unexpectedlyN)__name__
__module____qualname____doc__rD   propertyr9   rJ   rK   rO   rR   r#   r#   r#   r$   r&   P   s     
	r&   c                       sX   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Z  ZS )_DataLoaderIterSingleProcesszg
    Single process implement of DataLoaderIter, loading data from
    loader.data in main process
    c                    s\   t t| | t| j| j| j| j| j	| _
g | _| jt| j | _|   d| _| ad S NF)superrX   rD   r   Zcreate_fetcherr6   r'   r.   r;   r-   _dataset_fetcher_structure_infosr1   rH   r)   _blocking_queue_capacity_init_thread	_shutdownr!   rA   	__class__r#   r$   rD      s   
z%_DataLoaderIterSingleProcess.__init__c                 C   s   dd | j D | _dd | j D | _dd | j D | _dd | j D | _tt | jt	| j
dk| _t| j| j| j| j| j| j
| jd| j	| _tj| jt fd| _d| j_| j  d S )	Nc                 S      g | ]}|j qS r#   name.0vr#   r#   r$   
<listcomp>       z=_DataLoaderIterSingleProcess._init_thread.<locals>.<listcomp>c                 S   rb   r#   shapere   r#   r#   r$   rh      ri   c                 S   rb   r#   Zdtypere   r#   r#   r$   rh      ri   c                 S      g | ]}|j  qS r#   descZneed_check_feedre   r#   r#   r$   rh          
r   Ttargetargs)r(   
_var_names_shapes_dtypes_need_check_feedr   init_lod_tensor_blocking_queueVariabler]   rH   r)   r<   create_py_readerr0   r7   _readerr>   Thread_thread_loopr   r=   daemonstartrI   r#   r#   r$   r^      s(   
z)_DataLoaderIterSingleProcess._init_threadc                 C   sV  t | | j szt| j}| j|| j}W n ty&   |   Y d S w |d u s0| j r1ntt	|\}}| j
| | j rCnbzMt }|D ],}t|tjtjjfr]|  }nt|tjsqt }||t  |}|| qJ| j r~W n'z| j| W n   |   Y W n   |   tjt   Y | j r	|   d S r    )r   r@   is_setnextr:   r[   fetchStopIterationrO   r   r\   appendr   LoDTensorArray
isinstancepaddleTensoreagervalue
get_tensor	LoDTensorrM   CPUPlacer<   pushrR   sixreraisesysexc_info)rB   legacy_expected_placeindicesbatch	structurearrayslottmpr#   r#   r$   r}      sH   



-z)_DataLoaderIterSingleProcess._thread_loopc                    s  t  rtjdtjjd}|  zzt   t   t	 r5t
j j d }t| jd}nTt rG j }t| jd}nB jr j }tt|D ]
}||  ||< qU fddtt jD }dd t||D }t jdkr|d }n j }t   |W W t  r|  S S  ty    j     t j!t"#   Y nw W t  r|  d S d S t  r|  w w )NrX   rd   Z
event_typer   c                       g | ]} j d qS r   r\   poprf   _rI   r#   r$   rh         
z9_DataLoaderIterSingleProcess.__next__.<locals>.<listcomp>c                 S      g | ]	\}}t ||qS r#   r   rf   dsr#   r#   r$   rh         r   )$r   profilerRecordEventTracerEventType
Dataloaderbeginr   check_if_need_recordbefore_readerr   r   r   read_next_tensor_listr{   read_next_listr   r\   r   r   read_next_var_listr*   rG   rH   _move_to_listr)   zip	read_nextafter_readerendr   shutdown_try_shutdown_allr   r   r   r   rB   Ztrace_eventdataistructsr#   rI   r$   __next__  s^   









z%_DataLoaderIterSingleProcess.__next__c                 C   s\   | j r,| j  tdD ]}| j  rtd q n| j t ur'| j 	  d | _ d S d S )N   r   )
r=   r@   rM   rG   is_alivetimesleepr>   current_threadjoinrB   r   r#   r#   r$   _shutdown_thread/  s   



z-_DataLoaderIterSingleProcess._shutdown_threadc                 C      |   S r    r   rI   r#   r#   r$   r   @     z!_DataLoaderIterSingleProcess.nextc                 C   s>   | j sz| jr| j  d | _|   W d| _ d S d| _ w d S )NT)r_   r<   rN   r   rI   r#   r#   r$   r   C  s   

z._DataLoaderIterSingleProcess._try_shutdown_allc                 C      |    d S r    r   rI   r#   r#   r$   r"   R     z$_DataLoaderIterSingleProcess.__del__)rS   rT   rU   rV   rD   r^   r}   r   r   r   r   r"   __classcell__r#   r#   r`   r$   rX      s    7.rX   c                       s   e Zd Z fddZdd Zdd Zdd Zd	d
 Zd!ddZd"ddZ	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd  Z  ZS )#_DataLoaderIterMultiProcessc                    s   t t| | |j| _d| _| jdksJ d| jd | _d| _d| _	d| _
i | _g | _| jt| jt| j | _t | _tjjdtjd| _|   t| jD ]}|   qS|   d| _d S )Nr   z0Multi-process DataLoader invalid num_workers({}))lowhighF) rZ   r   rD   _persistent_workers_resume_worker_cntr/   format_data_queue	_send_idx	_rcvd_idx_batches_outstanding_task_infosr\   r1   maxrH   r)   _outstanding_capacityr>   Lock_thread_locknprandomrandintr   maxsize
_base_seed_init_workersrG   _try_put_indicesr^   r_   )rB   rC   r   r`   r#   r$   rD   X  s,   


z$_DataLoaderIterMultiProcess.__init__c                 C   s   g | _ g | _g | _tt| j| _t	 | _
t | _t | _t| jD ]>}t	 }| j| tjt| j| j|| j
| j| j| j| j| j|| j| j| jfd}d|_|  | j | | jd q&tt| tdd | j D  t   d S )Nrq   Tc                 s   s    | ]}|j V  qd S r    )pidrf   wr#   r#   r$   	<genexpr>  s    z<_DataLoaderIterMultiProcess._init_workers.<locals>.<genexpr>)!_workers_worker_status_indices_queues	itertoolscyclerG   r/   _workers_idx_cyclemultiprocessingQueuer   r?   _workers_done_eventr>   r@   r   Processr   r'   r6   r.   r;   r-   r5   r2   r   r~   r   r   Z_set_process_pidsidtupler   )rB   r   Zindices_queueworkerr#   r#   r$   r     s4   




 
z)_DataLoaderIterMultiProcess._init_workersc                 C   sB   | j d ur	 z| j   W n   | j   | j   Y d S qd S r    )r   
get_nowaitcancel_join_threadrN   rI   r#   r#   r$   _clear_and_remove_data_queue  s   


z8_DataLoaderIterMultiProcess._clear_and_remove_data_queuec                 C   s   dd | j D | _dd | j D | _dd | j D | _dd | j D | _tt | jt	| j
dk| _t| j| j| j| j| j| j
| jd| j	| _t | _tj| jt fd| _d| j_| j  d S )	Nc                 S   rb   r#   rc   re   r#   r#   r$   rh     ri   z<_DataLoaderIterMultiProcess._init_thread.<locals>.<listcomp>c                 S   rb   r#   rj   re   r#   r#   r$   rh     ri   c                 S   rb   r#   rl   re   r#   r#   r$   rh     ri   c                 S   rm   r#   rn   re   r#   r#   r$   rh     rp   r   Trq   )r(   rt   ru   rv   rw   r   rx   ry   r   rH   r)   r<   rz   r0   r7   r{   r>   r?   r@   r|   r}   r   r=   r~   r   rI   r#   r#   r$   r^     s*   

z(_DataLoaderIterMultiProcess._init_threadc                 C   sD  | j $ | j| _t| jD ]}| j| t  |  jd7  _qW d    n1 s*w   Y  | jdkr>t	d | jdks4| j
 t| jkrxt rWtj| j d }nt r`| j  n| jri| j  n| j }| j
 t| jksHd| _d| _d| _i | _g | _dg| j | _t| j| _t| j D ]}| !  qd S )Nr   r   g      ?T)"r   r/   r   rG   r   putr   r   r   r   r<   sizerH   r)   r   r   r   r   r{   r   r   r   r*   r   r   r   r   r\   r   r8   r9   r:   r   r   )rB   	worker_idr   r   r#   r#   r$   _reset  s>   




z"_DataLoaderIterMultiProcess._resetFc                 C   s:   | j | s
| jr|r| j| d  d| j |< d S d S d S rY   )r   r   r   r   )rB   r   r   r#   r#   r$   _shutdown_worker  s   z,_DataLoaderIterMultiProcess._shutdown_workerNc              
   C   s   | j sazR|   |   | j  t| jD ]	}| j|dd q| j s<| jD ]}|	| q&| j
D ]}|  |  q1W tt|  d| _ d S W tt|  d| _ d S tt|  d| _ w d S )NT)r   )r_   rO   r   r   rM   rG   r/   r   r   r   r   r   rN   r   Z_erase_process_pidsr   )rB   r3   r   r   qr#   r#   r$   r     s,   





z-_DataLoaderIterMultiProcess._try_shutdown_allc              
   C   sl  t | | j s|  }| j s|d u r|   nt|tr/| jdks'J |  jd8  _qzuzNt	 }| j
rC|D ]}|| q:n/|D ],}t|tjtjjfrX|  }nt|tjslt }||t  |}|| qE| j|s}| j  W n ty } z|   tjt   W Y d }~nd }~ww W |  jd7  _n|  jd7  _w | j r	d S d S )Nr   r   )r   r@   r   	_get_datarO   r   r   r   r   r   r2   r   r   r   r   r   r   r   rM   r   r<   r   rN   	ExceptionrR   r   r   r   r   r   )rB   r   r   r   Ztensorr   r   er#   r#   r$   r}     sL   




 z(_DataLoaderIterMultiProcess._thread_loopc              
   C   s  | j  sl| jtjkrL| j| jk r?| j| j }t|dks%| j	|d  r&n&| j| j= |  jd7  _|  j
d8  _
| j| jk s| jsL| j
t| jk rLd S | j| jv rot| j| j dkro| j| j}| j|d  |d S z
| jj| jd}W n~ ty } zr| j  rW Y d }~q g }t| jD ]\}}| j	| r| s|| | | qt|dkr|   ddd |D }td	t||t|tjst|trW Y d }~q |   t !d
| t"j#t$%   W Y d }~nrd }~ww | jtjkr"t|t&r"| jrd| j	|j'< n| |j' |  j
d8  _
| (  q |\}}	}
t|t)r9|	d u r9|
d u r9|S t|	t*rG|   |	#  || jkrY| j|= | j|
 |	S | j|  |	|
f7  < q | j  rd S d S )Nr   r   r   r   )r3   z, c                 s   s    | ]}t |jV  qd S r    )strr   r   r#   r#   r$   r   }  s    z8_DataLoaderIterMultiProcess._get_data.<locals>.<genexpr>z1DataLoader {} workers exit unexpectedly, pids: {}zLDataLoader reader thread failed({}) to read data from workers' result queue.F)+r@   r   r6   r   ZITERr   r   r   rH   r   r   r   r)   r   r\   r   r   getr4   r   	enumerater   r   r   rR   r   RuntimeErrorr   r   queueEmptyIOErrorloggingerrorr   r   r   r   r   r   r   r   r   )rB   infor   r   Zfailed_workersr   r   Zpidsidxr   r   r#   r#   r$   r   =  s   
	




z%_DataLoaderIterMultiProcess._get_datac              	   C   s   | j | jks
J d| j^ zt| j}W n ty%   Y W d    d S w t| jD ]}t| j}| j	| r9 n
q+	 W d    d S | j
| | j|f |f| j| j< |  j d7  _ |  jd7  _W d    d S 1 snw   Y  d S )Nz'too many indices have been put to queuer   )r   r   r   r   r:   r   rG   r/   r   r   r   r   r   r   )rB   r   r   Z
worker_idxr#   r#   r$   r     s,   

"z,_DataLoaderIterMultiProcess._try_put_indicesc                 C   r   r    r   rI   r#   r#   r$   r"     r   z#_DataLoaderIterMultiProcess.__del__c                 C   s   |  d d S rE   r   rI   r#   r#   r$   _shutdown_on_exit  s   z-_DataLoaderIterMultiProcess._shutdown_on_exitc                    s  t  rtjdtjjd}|  zzt   t    j	t
 jk r4 jr*t j   j  t rLtj j d }t| jd}nTt r^ j }t| jd}nB jr j }tt
|D ]
}||  ||< ql fddtt
 jD }dd t ||D }t
 jdkr|d }n j! } "  t #  |W W t  r|$  S S  ty    jsǈ j%   &  t'j(t)*   Y nw W t  r|$  d S d S t  r|$  w w )Nr   r   r   c                    r   r   r   r   rI   r#   r$   rh     r   z8_DataLoaderIterMultiProcess.__next__.<locals>.<listcomp>c                 S   r   r#   r   r   r#   r#   r$   rh     r   r   )+r   r   r   r   r   r   r   r   r   r   rH   r)   r   r   r@   rM   r<   rN   r   r   r   r   r{   r   r   r\   r   r   r   r*   rG   r   r   r   _on_output_batchr   r   r   r   r   r   r   r   r   r#   rI   r$   r     sl   











z$_DataLoaderIterMultiProcess.__next__c                 C   r   r    r   rI   r#   r#   r$   r     r   z _DataLoaderIterMultiProcess.nextc                 C   s.   t t| jD ]}|  jd8  _|   qd S rE   )rG   rH   r)   r   r   r   r#   r#   r$   r    s   
z,_DataLoaderIterMultiProcess._on_output_batch)Fr    )rS   rT   rU   rD   r   r   r^   r   r   r   r}   r   r   r"   r  r   r   r  r   r#   r#   r`   r$   r   V  s     +"

,
.o>r   )@osr   r   r   signalnumbersr  r   r>   numpyr   r   collectionsr   Zpaddle.fluid.frameworkr   r   r   r  r   Zpaddle.profilerr   Zpaddle.profiler.utilsr    r   r	   Z	frameworkr
   r   r   Zmultiprocess_utilsr   r   r   Zfetcherr   r   r+   r   Zcollater   r   r   r   r   r   r   r   r   r   Zflatr   r   Zpaddle.profiler.timerr   __all__r!   r%   registerobjectr&   rX   r   r#   r#   r#   r$   <module>   sD   $

D C