o
    Ne7                     @   sJ  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ ddl	m
Z
 ddlmZmZ ddlmZmZmZ ddlmZmZ dd	lmZ d dlZd
gZG dd deZG dd deZG dd deZG dd deZdadd
 ZG dd deZ G dd deZ!dZ"dZ#dZ$dZ%dZ&dZ'e(ej)j*d d Z+dZ,d$d d!Z-d"d# Z.dS )%    N)
namedtuple   )core   )_IterableDatasetFetcher_MapDatasetFetcher)_cleanup_mmapCleanupFuncRegistrarMP_STATUS_CHECK_INTERVAL)_non_static_mode_in_eager_without_dygraph_check)_flatten_batchget_worker_infoc                   @   s   e Zd Zdd ZdS )_IterableDatasetStopIterationc                 C   s
   || _ d S N)	worker_id)selfr    r   ND:\Projects\ConvertPro\env\Lib\site-packages\paddle/fluid/dataloader/worker.py__init__$   s   
z&_IterableDatasetStopIteration.__init__N)__name__
__module____qualname__r   r   r   r   r   r   "   s    r   c                   @   s   e Zd ZdS )_ResumeIterationN)r   r   r   r   r   r   r   r   (   s    r   c                   @   s    e Zd ZdZdZedd ZdS )_DatasetKindr   r   c                 C   s>   | t jkrt||||S | t jkrt||||S td| )Nzunknown Dataset kind {})r   MAPr   ITERr   NotImplementedErrorformat)kinddatasetauto_collate_batch
collate_fn	drop_lastr   r   r   create_fetcher0   s   

z_DatasetKind.create_fetcherN)r   r   r   r   r   staticmethodr$   r   r   r   r   r   ,   s
    r   c                   @   s   e Zd Zdd Zdd ZdS )ParentWatchDogc                 C   s   t  | _d| _d S NT)osgetppid_parent_pid_parent_aliver   r   r   r   r   ?   s   

zParentWatchDog.__init__c                 C   s   | j rt | jk| _ | j S r   )r+   r(   r)   r*   r,   r   r   r   is_aliveC   s   zParentWatchDog.is_aliveN)r   r   r   r   r-   r   r   r   r   r&   =   s    r&   c                   C   s   t S )an  
    Get DataLoader worker process information function, this function is
    used to split data copy in worker process for IterableDataset
    (see :code:`paddle.io.IterableDataset`), worker information contains
    following fields:

    :attr:`num_workers`: total worker process number, see `paddle.io.DataLoader`

    :attr:`id`: the worker processs id, count from 0 to :attr:`num_workers - 1`

    :attr:`dataset`: the dataset object in this worker process

    Returns:
        WorkerInfo: an instance of WorkerInfo which contains fields above.

    .. note::
        For more usage and examples, please see :code:`paddle.io.IterableDataset`

    Example:

        .. code-block:: python

            import math
            import paddle
            import numpy as np
            from paddle.io import IterableDataset, DataLoader, get_worker_info

            class SplitedIterableDataset(IterableDataset):
                def __init__(self, start, end):
                    self.start = start
                    self.end = end

                def __iter__(self):
                    worker_info = get_worker_info()
                    if worker_info is None:
                        iter_start = self.start
                        iter_end = self.end
                    else:
                        per_worker = int(
                            math.ceil((self.end - self.start) / float(
                                worker_info.num_workers)))
                        worker_id = worker_info.id
                        iter_start = self.start + worker_id * per_worker
                        iter_end = min(iter_start + per_worker, self.end)

                    for i in range(iter_start, iter_end):
                        yield np.array([i])

            place = paddle.CPUPlace()
            dataset = SplitedIterableDataset(start=2, end=9)
            dataloader = DataLoader(
                dataset,
                places=place,
                num_workers=2,
                batch_size=1,
                drop_last=True)

            for data in dataloader:
                print(data)
            # outputs: [2, 5, 3, 6, 4, 7]

    )_worker_infor   r   r   r   r   N   s   ?c                       s(   e Zd ZdZdd Z fddZ  ZS )
WorkerInfoFc                 K   s(   |  D ]
\}}t| || qd| _d S r'   )itemssetattr_WorkerInfo__initialized)r   kwargskvr   r   r   r      s   
zWorkerInfo.__init__c                    s*   | j rtd| jjtt| ||S )Nz&Cannot assign attributes to {} objects)r2   RuntimeErrorr   	__class__r   superr/   __setattr__)r   keyvalr7   r   r   r9      s
   zWorkerInfo.__setattr__)r   r   r   r2   r   r9   __classcell__r   r   r<   r   r/      s    r/   c                   @   s   e Zd ZdddZdd ZdS )_WorkerExceptionNc                 C   s2   || _ |pt }|d | _dtj| | _d S )Nr    )r   sysexc_infoexc_typejoin	tracebackformat_exceptionexc_msg)r   r   rA   r   r   r   r      s   
z_WorkerException.__init__c                 C   s:   d | j| jj| j}t| jdd r| j|d| |)Nz0DataLoader worker({}) caught {} with message:
{}message)rG   )r   r   rB   r   rF   getattr)r   msgr   r   r   reraise   s   
z_WorkerException.reraiser   )r   r   r   r   rJ   r   r   r   r   r>      s    
r>   iװCl   u=& l   y iXl   y isI   l    c                    s   t t}fdd dd }|| t@ | d? dg} fdd|D }tt|D ]}tt|D ]}||krB|||  || ||< q/q'g }|D ]!}	|	|A t@ }
|t t@ }|
| t@ }
|
|
t? A t@ }
||
 qH|S )	Nc                    s8   |  A t @ }  t t @  |   t @ } | | t? A t @ } | S r   )MASK32MULT_AXSHIFT)value)hash_const_Ar   r   hash   s
   z_generate_states.<locals>.hashc                 S   s8   t |  t@ }t| t@ }|| t@ }||t? A t@ }|S r   )
MIX_MULT_LrL   
MIX_MULT_RrN   )xyZresult_xZresult_yresultr   r   r   mix   s
   z_generate_states.<locals>.mix    r   c                    s   g | ]} |qS r   r   ).0Zentropy)rQ   r   r   
<listcomp>   s    z$_generate_states.<locals>.<listcomp>)INIT_AINIT_BrL   rangelenMULT_BrN   append)	base_seedr   Zhash_const_BrW   ZentropyspoolijZstatespstater   )rQ   rP   r   _generate_states   s(   rg   c              
      s  zkzJt t t  zdd l dd l}dd l}W n	 ty$   Y nw ||	 }|	| t
	|  j	t||	 t|	|
| |dad }z|d urO||	 t|| |||}W n   t|	}Y d}t }| rLz|t}W n
 tjy}   Y qgw t|tr||d d f d}t|| ||d}qg|d u r| s|sJ dn| s|rqg|\}}z*|d ur|}d }nt
jjjt
 d ||}W d    n1 sw   Y  W n1 t y } z$t|t!r|tj"kr|t#|	 d}n
||t|	d f W Y d }~n@d }~ww t|tr|||d f t$|\}}|r@dd  fd	d
|D }||||f t%| n||||f | slW n t&yW   Y n   t'j(t)*   Y W |rkt  d S d S |rtt  w w )Nr   )idnum_workersr    seedFTz#get None when worker done_event set)Zplacec                 S   s   t  rt| S |  S r   )r   r   _array_to_share_memory_tensorZ_share_memory)Ztensorr   r   r   tensor_share_memoryb  s   
z)_worker_loop.<locals>.tensor_share_memoryc                    s*   g | ]}t | jrt|n|qS r   )
isinstanceZndarrayr   rk   )rY   bnprl   r   r   rZ   f  s    
z _worker_loop.<locals>.<listcomp>)+r	   registerr   r   Z_set_process_signal_handlernumpytimerandomImportErrorrj   paddlerg   r/   r.   r   r$   r>   r&   r-   getr
   queueEmptyrm   r   putis_setZfluidZdygraphguardZCPUPlacefetch	ExceptionStopIterationr   r   r   Z_remove_tensor_list_mmap_fdsKeyboardInterruptsixrJ   r@   rA   )r    Zdataset_kindZindices_queueZ	out_queueZ
done_eventr!   r"   r#   Zinit_fnr   ri   Zuse_shared_memoryra   rs   rt   rj   Zinit_exceptionfetcherZiterator_drainedZparent_watch_dogdataidxindicesbatcheZ	structureZtensor_listr   ro   r   _worker_loop  s   






@

r   )r   r   )/r(   r   r@   rv   rr   rp   rD   collectionsr   r?   r   r   r   r   Zmultiprocess_utilsr   r	   r
   Z	frameworkr   r   Zflatr   rx   __all__objectr   r   r   r&   r.   r   r/   r>   r[   rM   r\   r_   rR   rS   ZdtypeZuint32itemsizerN   rL   rg   r   r   r   r   r   <module>   s@   B0
)