o
    Me3b                     @   sz  d dl mZ ddlmZ d dlZd dlZd dlZd dlm	Z	m
Z
mZ d dlZd dlZddlmZmZmZmZmZ ddlmZmZ dgZejjjd	ejjjd
ejjjdejjjdejjjdejjjdejjjdejjj dejjj!dejjj"dejjj#dejjj$diZ%dd Z&	d&ddZ'd&ddZ(	d&ddZ)e*e+efe,efdfdd Z-G d!d" d"e.Z/G d#d$ d$e.Z0G d%d de.Z1dS )'    )print_function   )coreN)ziprangexrange)Variabledefault_main_program_current_expected_place_non_static_mode_in_eager_without_dygraph_check)_cpu_num	_cuda_ids
DataFeederboolfloat16uint16float32float64int8int16int32int64uint8	complex64
complex128c                 C   s   t | tjjr| tv rt|  S n2t | tr4| ttjtj	tj
tjtjtjtjtjtjtjtjfv r3| jS n| dv r<t| S | dv rBdS td|  )N)r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   )Zbfloat16r   zdtype must be any of [bool, float16, uint16, float32, float64, int8, int16, int32, int64, uint8, complex64, complex128], but received %s)
isinstancer   VarDescVarType_PADDLE_DTYPE_2_NUMPY_DTYPEtyper   npr   r   r   r   r   r   r   r   r   r   r   __name__str	TypeErrordtype r'   HD:\Projects\ConvertPro\env\Lib\site-packages\paddle/fluid/data_feeder.pyconvert_dtype.   s*   
r)    c                 C   s&   t | |t|| t| j|||| d S N)
check_typer   check_dtyper&   )input
input_nameexpected_dtypeop_nameextra_messager'   r'   r(   check_variable_and_dtypeP   s   r3   c                 C   s   t  rd S ddlm} | r't|ts|f}|tjf7 }t r&|tjj	f7 }n"t| tjr5t
d||ttdrIt| tjj	rIt
d||t| |s[t
d|||t| |f d S )Nr   )in_declarative_modezPlease use `with fluid.dygraph.guard()` as context or `fluid.enable_dygraph()` to switch to imperative mode firstly. Because received '{}' in {} is a imperative Variable.eagerz6The type of '%s' in %s must be %s, but received %s. %s)r   Zdygraph.baser4   r   tupler   ZVarBaser   r5   ZTensorr$   formathasattrr    )r.   r/   expected_typer1   r2   r4   r'   r'   r(   r,   Y   s@   


r,   c                 C   s|   t  rd S t| dv rtd|||f  t| dv r)|dvr)td|||f  t| |vr<td|||t| |f d S )N)r   z?The data type of '%s' in %s only support float16 in GPU now. %s)r   )reshapeZlookup_tablescalezCThe data type of '%s' in %s only support bfloat16 in OneDNN now. %sz;The data type of '%s' in %s must be %s, but received %s. %s)r   r)   warningswarnr$   )Zinput_dtyper/   r0   r1   r2   r'   r'   r(   r-      s.   r-   )r   r   c                 C   s   t  rd S t| d|| |d ur9t| ts9| D ]!}t|d|| |d ur8t|tr8t|jd||dd| q|d urLt| trNt| jd|| d S d S d S )Nshapezelement of shapez9If element of shape is Tensor, its data type should be {}z, )r   r,   r   r   r-   r&   r7   join)r>   r1   Zexpected_shape_typeZexpected_element_typeZexpected_tensor_dtypeitemr'   r'   r(   check_shape   s"   rA   c                   @   s<   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )DataToLoDTensorConverterc                 C   sZ   || _ || _|| _d}| jD ]}|dk r|d7 }|dkr!d | _ nqt|| _|   d S Nr   r   )place	lod_levelr>   r)   r&   _reset)selfrD   rE   r>   r&   Znegtive_countsr'   r'   r(   __init__   s   

z!DataToLoDTensorConverter.__init__c                 C   s$   g | _ dd tj| jD | _d S )Nc                 S   s   g | ]}g qS r'   r'   .0_r'   r'   r(   
<listcomp>   s    z3DataToLoDTensorConverter._reset.<locals>.<listcomp>)datasixmovesr   rE   lodrG   r'   r'   r(   rF      s   zDataToLoDTensorConverter._resetc                 C   s   |  || j| j d S r+   )_feed_impl_rQ   rE   )rG   rN   r'   r'   r(   feed   s   zDataToLoDTensorConverter.feedc                 C   sR   |dkr| j | d S |d t| |D ]}| ||dd  |d  qd S rC   )rN   appendlenrS   )rG   rN   rQ   rE   Z	each_datar'   r'   r(   rS      s   z$DataToLoDTensorConverter._feed_impl_c                 C   sD   t | j|D ]\}}||kr|dkr|dkrtd| j|qd S )Nr   zDShape not match. What is defined in data layer is {}, but receive {})r   r>   
ValueErrorr7   )rG   r>   s1s2r'   r'   r(   _check_shape   s   
z%DataToLoDTensorConverter._check_shapec                 C   s   t j| j| jd}| jr0t|jt| jkr0z|| j}W n ty/   td| j|jw t	
 }||| j | jdkrF|| j |   |S )Nr%   zBReshape error. What is defined in data layer is {}, but receive {}r   )r!   arrayrN   r&   r>   rV   r:   rW   r7   r   Z	LoDTensorsetrD   rE   Zset_recursive_sequence_lengthsrQ   rF   )rG   Zarrtr'   r'   r(   done   s"   
zDataToLoDTensorConverter.doneN)	r"   
__module____qualname__rI   rF   rT   rS   rZ   r^   r'   r'   r'   r(   rB      s    rB   c                   @   s$   e Zd Zdd Zdd Zdd ZdS )BatchedTensorProviderc              	   C   s\   || _ || _|| _g | _|| _|D ]}|jdksJ d| jt| j d|j|j	d qd S )Nr   zlod_level must be 0rD   rE   r>   r&   )
rD   
batch_size	generator
converters	drop_lastrE   rU   rB   r>   r&   )rG   	feed_listrD   rc   rd   rf   varr'   r'   r(   rI      s   zBatchedTensorProvider.__init__c                 C   s   dd | j D S )Nc                 S      g | ]}|  qS r'   )r^   rK   cr'   r'   r(   rM          z/BatchedTensorProvider._done.<locals>.<listcomp>)re   rR   r'   r'   r(   _done   s   zBatchedTensorProvider._donec                 c   s    d}|   D ]%}tj|| jD ]
\}}|j| q|d7 }|| jkr,d}|  V  q| j	s;|dkr;|  V  d S dd | jD  d S )Nr   r   c                 S   ri   r'   )rF   rj   r'   r'   r(   rM   
  rl   z2BatchedTensorProvider.__call__.<locals>.<listcomp>)
rd   rO   rP   r   re   rN   rU   rc   rm   rf   )rG   idxeach_sample	each_sloteach_converterr'   r'   r(   __call__   s   

zBatchedTensorProvider.__call__N)r"   r_   r`   rI   rm   rr   r'   r'   r'   r(   ra      s    ra   c                   @   sB   e Zd ZdZdddZdd ZdddZd	d
 Z		dddZdS )r   a4	  
    :api_attr: Static Graph
    
    DataFeeder converts the data that returned by a reader into a data
    structure that can feed into Executor. The reader is usually a 
    python generator that returns a list of mini-batch data entries. 

    Parameters:
        feed_list (list): Variables or names of Variables that need
            to feed.
        place (:ref:`api_fluid_CPUPlace` | :ref:`api_fluid_CUDAPlace` ): 
            place indicates the device (CPU | GPU) the data will be fed into, if 
            you want to feed data into GPU, please using :code:`fluid.CUDAPlace(i)` 
            (:code:`i` represents the GPU id), or if you want to feed data into CPU, 
            please using :code:`fluid.CPUPlace()`.
        program (:ref:`api_fluid_Program` , optional): The Program that will 
            feed data into, if program is None, it will use default_main_program(). 
            Default None.

    Raises:
        :code:`ValueError` - If some Variables are not in this Program.

    Example:
        ..  code-block:: python

            import numpy as np
            import paddle
            import paddle.fluid as fluid
            
            place = fluid.CPUPlace()
            def reader():
                for _ in range(4):
                    yield np.random.random([4]).astype('float32'), np.random.random([3]).astype('float32'),
            
            main_program = fluid.Program()
            startup_program = fluid.Program()
            
            with fluid.program_guard(main_program, startup_program):
                data_1 = fluid.data(name='data_1', shape=[None, 2, 2], dtype='float32')
                data_2 = fluid.data(name='data_2', shape=[None, 1, 3], dtype='float32')
                out = fluid.layers.fc(input=[data_1, data_2], size=2)
                # ...
            feeder = fluid.DataFeeder([data_1, data_2], place)
            
            exe = fluid.Executor(place)
            exe.run(startup_program)
            
            feed_data = feeder.feed(reader())
            
            # print feed_data to view feed results
            # print(feed_data['data_1'])
            # print(feed_data['data_2'])
            
            outs = exe.run(program=main_program,
                            feed=feed_data,
                            fetch_list=[out])
            print(outs)

    Nc                 C   s   g | _ g | _g | _g | _|d u rt }|D ]5}t|tjr%|d	|}t|t
s.td| j |j | j|j | j|j | j|j q|| _d S )Nr   z+Feed list should contain a list of variable)feed_dtypes
feed_namesfeed_shapesfeed_lod_levelr	   r   rO   string_typesblockrh   r   r$   rU   r&   namerE   r>   rD   )rG   rg   rD   programZeach_varr'   r'   r(   rI   J  s    

zDataFeeder.__init__c              	   C   s   g }t j| j| j| jD ]\}}}|t| j|||d q|D ]'}t	|t	|ks7J dt	|t	|f t j||D ]	\}}|
| q>q!i }	t j| j|D ]
\}
}| |	|
< qS|	S )a  
        According to :code:`feed_list` of :code:`DataFeeder` and :code:`iterable` , converts 
        the input into a data structure that can feed into Executor.

        Parameters:
            iterable (generator): user defined python generator to read the raw input data

        Returns: 
            :code:`dict`: a :code:`dict` that contains (variable name - converted tensor) pairs

        Example:
            ..  code-block:: python

                # In this example, reader - generator will return a list of ndarray of 3 elements
                # feed API will convert each ndarray input into a tensor
                # the return result is a dict with keys: data_1, data_2, data_3
                # result['data_1']  a LoD-Tensor with shape of  [5, 2, 1, 3]. 5 is batch size, and [2, 1, 3] is the real shape of data_1.
                # result['data_2'], result['data_3'] are similar.
                import numpy as np
                import paddle.fluid as fluid
                
                def reader(limit=5):
                    for i in range(1, limit + 1):
                        yield np.ones([6]).astype('float32') * i , np.ones([1]).astype('int64') * i, np.random.random([9]).astype('float32')
                
                data_1 = fluid.data(name='data_1', shape=[None, 2, 1, 3])
                data_2 = fluid.data(name='data_2', shape=[None, 1], dtype='int64')
                data_3 = fluid.data(name='data_3', shape=[None, 3, 3], dtype='float32')
                feeder = fluid.DataFeeder(['data_1','data_2', 'data_3'], fluid.CPUPlace())
                
                
                result = feeder.feed(reader())
                print(result['data_1'])
                print(result['data_2'])
                print(result['data_3'])

        rb   zDThe number of fields in data (%d) does not match len(feed_list) (%d))rO   rP   r   rv   ru   rs   rU   rB   rD   rV   rT   rt   r^   )rG   iterable	converterrE   r>   r&   ro   rq   rp   Zret_dictZ	each_namer'   r'   r(   rT   ]  s8   &

zDataFeeder.feedc                 c   s    t | jtjrdd tj| |D }ndd tj| |D }t|t|kr1t	d| j}tj
||D ]\}}|| _| |V  q;|| _dS )a	  
        Similar with feed function, feed_parallel is used with multiple devices (CPU|GPU).
        Here :code:`iterable` is a list of python generators. The data return by each 
        generator in the list will be fed into a separate device.        

        Parameters:
            iterable (list|tuple): list of user-defined python generators. The element 
                number should match the :code:`num_places`.
            num_places (int, optional): the number of devices. If not provided (None), 
                all available devices on the machine will be used. Default None.

        Returns: 
            :code:`generator`: a :code:`generator` that generate dict which contains (variable name - converted tensor) pairs, 
            the total number of dicts will be generated matches with the :code:`num_places`

        .. note::        
            The number of devices - :code:`num_places` should equal to the generator (element of :code:`iterable` ) number

        Example:
            ..  code-block:: python

                import numpy as np
                import paddle.fluid as fluid

                def generate_reader(batch_size, base=0, factor=1):
                    def _reader():
                        for i in range(batch_size):
                            yield np.ones([4]) * factor + base, np.ones([4]) * factor + base + 5
                    return _reader()

                x = fluid.data(name='x', shape=[None, 2, 2])
                y = fluid.data(name='y', shape=[None, 2, 2], dtype='float32')

                z = fluid.layers.elementwise_add(x, y)

                feeder = fluid.DataFeeder(['x','y'], fluid.CPUPlace())
                place_num = 2
                places = [fluid.CPUPlace() for x in range(place_num)]
                data = []
                exe = fluid.Executor(fluid.CPUPlace())
                exe.run(fluid.default_startup_program())
                program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(places=places)

                # print sample feed_parallel r result
                # for item in list(feeder.feed_parallel([generate_reader(5, 0, 1), generate_reader(3, 10, 2)], 2)):
                #     print(item['x'])
                #     print(item['y'])

                reader_list = [generate_reader(5, 0, 1), generate_reader(3, 10, 2)]
                res = exe.run(program=program, feed=list(feeder.feed_parallel(reader_list, 2)), fetch_list=[z])
                print(res)

        c                 S   s   g | ]}t |qS r'   )r   	CUDAPlace)rK   ir'   r'   r(   rM     s    
z,DataFeeder.feed_parallel.<locals>.<listcomp>c                 S   s   g | ]}t  qS r'   )r   ZCPUPlacerJ   r'   r'   r(   rM     s    zfeed_parallel takes multiple mini-batches. Each mini-batch will be feed on each device. The number of devices and number of mini-batches must be same.N)r   rD   r   r}   rO   rP   r   _get_number_of_places_rV   rW   r   rT   )rG   r{   
num_placesZplacesrD   pbatchr'   r'   r(   feed_parallel  s&   6
zDataFeeder.feed_parallelc                 C   s.   |d urt |S t| jtjrtt S t S r+   )intr   rD   r   r}   rV   r   r   )rG   r   r'   r'   r(   r     s
   
z!DataFeeder._get_number_of_places_Tc                    s    fdd}|S )a  
        Decorate the reader (generator) to fit multiple devices. The reader generate
        multiple mini-batches. Each mini-batch will be fed into a single device.

        Parameters:
            reader(generator): a user defined python generator used to get :code:`mini-batch` of data.
                A :code:`mini-batch` can be regarded as a python generator that returns batches of input 
                entities, just like the below :code:`_mini_batch` in the code example.                      
            multi_devices(bool): indicate whether to use multiple devices or not.
            num_places(int, optional): if :code:`multi_devices` is True, you can specify the number
                of devices(CPU|GPU) to use, if multi_devices is None, the function will use all the
                devices of the current machine. Default None.
            drop_last(bool, optional): whether to drop the last round of data if it is not enough to 
                feed all devices. Default True.

        Returns: 
            :code:`generator`: a new :code:`generator` which return converted dicts that can be fed into Executor
            
        Raises:
            :code:`ValueError`: If drop_last is False and the data cannot fit devices perfectly.

        Example:
            ..  code-block:: python

                import numpy as np
                import paddle
                import paddle.fluid as fluid
                import paddle.fluid.compiler as compiler
                
                def reader():
                    def _mini_batch(batch_size):
                        for i in range(batch_size):
                            yield np.random.random([16]).astype('float32'), np.random.randint(10, size=[1])

                    for _ in range(10):
                        yield _mini_batch(np.random.randint(1, 10))
                
                place_num = 3
                places = [fluid.CPUPlace() for _ in range(place_num)]
                
                # a simple network sample
                data = fluid.data(name='data', shape=[None, 4, 4], dtype='float32')
                label = fluid.data(name='label', shape=[None, 1], dtype='int64')
                hidden = fluid.layers.fc(input=data, size=10)
                
                feeder = fluid.DataFeeder(place=places[0], feed_list=[data, label])
                reader = feeder.decorate_reader(reader, multi_devices=True, num_places=3, drop_last=True)
                
                exe = fluid.Executor(places[0])
                exe.run(fluid.default_startup_program())
                compiled_prog = compiler.CompiledProgram(
                         fluid.default_main_program()).with_data_parallel(places=places)
                
                for i,data in enumerate(reader()):
                    # print data if you like
                    # print(i, data)
                    ret = exe.run(compiled_prog, feed=data, fetch_list=[hidden])
                    print(ret)

        c                  3   s    s D ]}  | V  qd S }g }  D ]}| | t| |kr3t| |V  g } q s@t| dkrBtdd S d S )Nr   zwThe data batch which cannot fit for devices will be dropped is not implementation. Other strategies are not implemented)rT   r   rU   rV   listr   rW   )r@   numr   rf   multi_devicesr   readerrG   r'   r(   __reader_creator__1  s$   



z6DataFeeder.decorate_reader.<locals>.__reader_creator__r'   )rG   r   r   r   rf   r   r'   r   r(   decorate_reader  s   BzDataFeeder.decorate_readerr+   )NT)	r"   r_   r`   __doc__rI   rT   r   r   r   r'   r'   r'   r(   r     s    
<
=M)r*   )2
__future__r   r*   r   numpyr!   osrO   Z	six.movesr   r   r   multiprocessingr<   Z	frameworkr   r	   r
   r   r   r   r   __all__r   r   BOOLZFP16ZBF16ZFP32ZFP64ZINT8ZINT16ZINT32ZINT64ZUINT8Z	COMPLEX64Z
COMPLEX128r   r)   r3   r,   r-   r   r6   r   rA   objectrB   ra   r   r'   r'   r'   r(   <module>   sJ   











&

	+

8&