o
    Me߱                     @   s   d Z ddlmZ ddlmZ ddlmZ ddlmZ g dZ	G d	d
 d
e
ZG dd de
ZG dd deZG dd deZG dd deZG dd deZdS )zBThis is definition of dataset class, which is high performance IO.    )data_feed_pb2)text_format   )core   )
deprecated)DatasetFactoryInMemoryDatasetQueueDatasetc                   @   s"   e Zd ZdZdd ZdddZdS )	r   a^  
    DatasetFactory is a factory which create dataset by its name,
    you can create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
    the default is "QueueDataset".

    Example:
        .. code-block:: python

          import paddle.fluid as fluid
          dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")

    c                 C   s   dS ) Init. N selfr   r   DD:\Projects\ConvertPro\env\Lib\site-packages\paddle/fluid/dataset.py__init__&   s   zDatasetFactory.__init__r
   c                 C   s&   z	t  |  }|W S    td| )a  
        Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
        the default is "QueueDataset".

        Args:
            datafeed_class(str): datafeed class name, QueueDataset or InMemoryDataset.
                                 Default is QueueDataset.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()

        z datafeed class %s does not exist)globals
ValueError)r   Zdatafeed_classdatasetr   r   r   create_dataset*   s   zDatasetFactory.create_datasetN)r
   )__name__
__module____qualname____doc__r   r   r   r   r   r   r      s    r   c                   @   s   e Zd ZdZdd Zdd Zdd Zdd	 Zd,ddZdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+S )-DatasetBasez Base dataset class. c                 C   s:   t  | _d| j_td| _d| _g | _d| _	d| _
dS )r   catZMultiSlotDatasetr   FN)r   ZDataFeedDesc
proto_descpipe_commandr   Datasetr   
thread_numfilelist
use_ps_gpupsgpur   r   r   r   r   E   s   

zDatasetBase.__init__c                 C      || j _dS )a  
        Set pipe command of current dataset
        A pipe command is a UNIX pipeline command that can be used only

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              dataset.set_pipe_command("python my_script.py")

        Args:
            pipe_command(str): pipe command

        N)r   r   )r   r   r   r   r   set_pipe_commandQ   s   zDatasetBase.set_pipe_commandc                 C   r"   )aJ  
        Set so parser name of current dataset

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              dataset.set_so_parser_name("./abc.so")

        Args:
            pipe_command(str): pipe command

        N)r   so_parser_name)r   r$   r   r   r   set_so_parser_namec      zDatasetBase.set_so_parser_namec                 C   r"   )aa  
        Set rank_offset for merge_pv. It set the message of Pv.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              dataset.set_rank_offset("rank_offset")

        Args:
            rank_offset(str): rank_offset's name

        N)r   rank_offset)r   r'   r   r   r   set_rank_offsett   r&   zDatasetBase.set_rank_offsetTc                 C   s   |r	| j || || _dS )a  
        set fea eval mode for slots shuffle to debug the importance level of
        slots(features), fea_eval need to be set True for slots shuffle.
        
        Args:
            record_candidate_size(int): size of instances candidate to shuffle 
                                        one slot
            fea_eval(bool): whether enable fea eval mode to enable slots shuffle.
                            default is True.
            
        Examples:
            .. code-block:: python

            import paddle.fluid as fluid
            dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
            dataset.set_fea_eval(1000000, True)

        N)r   set_fea_evalfea_eval)r   Zrecord_candidate_sizer*   r   r   r   r)      s   
zDatasetBase.set_fea_evalc                 C   s"   | j rt|}| j| dS dS a  
        Slots Shuffle 
        Slots Shuffle is a shuffle method in slots level, which is usually used 
        in sparse feature with large scale of instances. To compare the metric, i.e.
        auc while doing slots shuffle on one or several slots with baseline to 
        evaluate the importance level of slots(features).
        
        Args:
            slots(list[string]): the set of slots(string) to do slots shuffle.

        Examples:
            import paddle.fluid as fluid
            dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
            dataset.set_merge_by_lineid()
            #suppose there is a slot 0
            dataset.slots_shuffle(['0'])
        N)r*   setr   slots_shuffler   slotsZ	slots_setr   r   r   r-      s   zDatasetBase.slots_shufflec                 C   r"   )aG  
        Set batch size. Will be effective during training

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              dataset.set_batch_size(128)

        Args:
            batch_size(int): batch size

        N)r   
batch_size)r   r0   r   r   r   set_batch_size   r&   zDatasetBase.set_batch_sizec                 C   r"   )aW  
        Set pv batch size. It will be effective during enable_pv_merge

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              dataset.set_pv_batch(128)
        Args:
            pv_batch_size(int): pv batch size

        N)r   pv_batch_size)r   r2   r   r   r   set_pv_batch_size      zDatasetBase.set_pv_batch_sizec                 C      | j | || _dS )a:  
        Set thread num, it is the num of readers.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
               dataset.set_thread(12)

        Args:
            thread_num(int): thread num
        N)r   set_thread_numr   r   r   r   r   r   
set_thread      
zDatasetBase.set_threadc                 C   r5   )a@  
        Set file list in current worker.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              dataset.set_filelist(['a.txt', 'b.txt'])

        Args:
            filelist(list): file list
        N)r   set_filelistr   )r   r   r   r   r   r:      r9   zDatasetBase.set_filelistc                 C   s   || j _d S N)r   
input_type)r   r<   r   r   r   set_input_type   s   zDatasetBase.set_input_typec                 C   s   | j j}|D ]D}|j }d|_|j|_|jdkr#d|_|j	|j |j
tjjjkr/d|_q|j
tjjjkr;d|_q|j
tjjjkrGd|_qtddS )a?  
        Set Variables which you will use.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              dataset.set_use_var([data, label])

        Args:
            var_list(list): variable list
        Tr   floatZuint64Zuint32zQCurrently, fluid.dataset only supports dtype=float32, dtype=int32 and dtype=int64N)r   Zmulti_slot_descr/   addZis_usednameZ	lod_levelZis_denseshapeextendZdtyper   ZVarDescZVarTypeZFP32typeZINT64ZINT32r   )r   Zvar_listZ
multi_slotvarZslot_varr   r   r   set_use_var   s$   

zDatasetBase.set_use_varc                 C   s   | j || dS )ae  
        Set hdfs config: fs name ad ugi

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              dataset.set_hdfs_config("my_fs_name", "my_fs_ugi")

        Args:
            fs_name(str): fs name
            fs_ugi(str): fs ugi
        N)r   set_hdfs_config)r   Zfs_nameZfs_ugir   r   r   rF     s   zDatasetBase.set_hdfs_configc                 C      | j | dS )aa  
        Set customized download cmd: download_cmd

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              dataset.set_download_cmd("./read_from_afs")

        Args:
            download_cmd(str): customized download command
        N)r   set_download_cmd)r   Zdownload_cmdr   r   r   rH   *  s   zDatasetBase.set_download_cmdc                 C   sH   | j t| jkrt| j| _ | j| j  | j|   | j  dS )h
        Set data_feed_desc before load or shuffle,
        user no need to call this function.
        N)r   lenr   r   r6   set_data_feed_descdesccreate_readersr   r   r   r   _prepare_to_run:  s
   zDatasetBase._prepare_to_runc                 C   s,   d| _ t sd| _ dS | j r|| _dS dS )zQ
        set use_ps_gpu flag

        Args:
            use_ps_gpu: bool
        TFN)r    r   _is_compiled_with_heterpsr!   )r   r!   r   r   r   _set_use_ps_gpuE  s   

zDatasetBase._set_use_ps_gpuc                 C   s   | j   d S r;   )r   Zdestroy_readersr   r   r   r   _finish_to_runS  s   zDatasetBase._finish_to_runc                 C   s   t | jS )a7  
        Returns a protobuf message for this DataFeedDesc

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset()
              print(dataset.desc())

        Returns:
            A string message
        )r   ZMessageToStringr   r   r   r   r   rL   V  r4   zDatasetBase.descc                 C      d S r;   r   r7   r   r   r   _dynamic_adjust_before_trainf     z(DatasetBase._dynamic_adjust_before_trainc                 C   rR   r;   r   r   r   r   r   _dynamic_adjust_after_traini  rT   z'DatasetBase._dynamic_adjust_after_trainN)T)r   r   r   r   r   r#   r%   r(   r)   r-   r1   r3   r8   r:   r=   rE   rF   rH   rN   rP   rQ   rL   rS   rU   r   r   r   r   r   B   s,    
!r   c                       s2  e Zd ZdZeddd fddZeddddd	 Zedd
ddd Zeddddd Zeddddd Z	eddddd Z
eddddd Zeddddd Zdd Zdd Zedd dd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zedd+dd`d-d.Zedd/ddad1d2Zedd3ddbd5d6Zedd7dd8d9 Zedd:dd;d< Zd=d> Zedd?ddcdAdBZeddCddddEdFZeddGddHdI ZeddJddKdL ZeddMddedOdPZeddQddRdS ZdTdU Z eddVddddWdXZ!eddYddddZd[Z"dcd\d]Z#d^d_ Z$  Z%S )fr	   z
    InMemoryDataset, it will load data into memory
    and shuffle data before training.
    This class should be created by DatasetFactory

    Example:
        dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset")
    2.0.0z"paddle.distributed.InMemoryDatasetZsinceZ	update_toc                    s\   t t|   d| j_d| _d| _d| _d| _d| _	d| _
d| _d| _d| _d| _d| _dS )r   ZMultiSlotInMemoryDataFeedNFT)superr	   r   r   r@   fleet_send_batch_sizeis_user_set_queue_num	queue_numparse_ins_idparse_contentparse_logkeymerge_by_sidenable_pv_mergemerge_by_lineidfleet_send_sleep_secondstrainer_numr   	__class__r   r   r   w  s   
zInMemoryDataset.__init__z1paddle.distributed.InMemoryDataset._set_feed_typec                 C   s(   || j _| j jdkrtd| _dS dS )z$
        Set data_feed_desc
        ZSlotRecordInMemoryDataFeedZSlotRecordDatasetN)r   r@   r   r   r   )r   Zdata_feed_typer   r   r   set_feed_type  s   zInMemoryDataset.set_feed_typez2paddle.distributed.InMemoryDataset._prepare_to_runc                 C   s   | j dkrd| _ | j| j  | jdu r| j | _| j| j | j| j | j| j | j	| j
 | j| j | j| j | j|   | j  | j  dS )rI   r   r   N)r   r   r6   r\   set_queue_numset_parse_ins_idr]   set_parse_contentr^   set_parse_logkeyr_   set_merge_by_sidr`   set_enable_pv_mergera   rK   rL   Zcreate_channelrM   r   r   r   r   rN     s   


zInMemoryDataset._prepare_to_runz?paddle.distributed.InMemoryDataset._dynamic_adjust_before_trainc                 C   s:   | j s| jr| j|d n| j|d | j| d S NTF)r[   r    r   dynamic_adjust_channel_numdynamic_adjust_readers_numr7   r   r   r   rS     s
   z,InMemoryDataset._dynamic_adjust_before_trainz>paddle.distributed.InMemoryDataset._dynamic_adjust_after_trainc                 C   s@   | j s| jr| j| jd n| j| jd | j| j d S rn   )r[   r    r   ro   r   rp   r   r   r   r   rU     s
   z+InMemoryDataset._dynamic_adjust_after_trainz1paddle.distributed.InMemoryDataset._set_queue_numc                 C   s   d| _ || _dS )au  
        Set Dataset output queue num, training threads get data from queues

        Args:
            queue_num(int): dataset output queue num

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset.set_queue_num(12)

        TN)r[   r\   )r   r\   r   r   r   rh     s   
zInMemoryDataset.set_queue_numz4paddle.distributed.InMemoryDataset._set_parse_ins_idc                 C   
   || _ dS )a[  
        Set id Dataset need to parse insid

        Args:
            parse_ins_id(bool): if parse ins_id or not

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset.set_parse_ins_id(True)

        N)r]   )r   r]   r   r   r   ri        
z InMemoryDataset.set_parse_ins_idz5paddle.distributed.InMemoryDataset._set_parse_contentc                 C   rq   )a`  
        Set if Dataset need to parse content

        Args:
            parse_content(bool): if parse content or not

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset.set_parse_content(True)

        N)r^   )r   r^   r   r   r   rj     rr   z!InMemoryDataset.set_parse_contentc                 C   rq   )a]  
        Set if Dataset need to parse logkey

        Args:
            parse_content(bool): if parse logkey or not

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset.set_parse_logkey(True)

        N)r_   )r   r_   r   r   r   rk        
z InMemoryDataset.set_parse_logkeyc                 C   rq   )a8  
        Set trainer num

        Args:
            trainer_num(int): trainer num

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset._set_trainer_num(1)

        N)rd   )r   rd   r   r   r   _set_trainer_num  rs   z InMemoryDataset._set_trainer_numz4paddle.distributed.InMemoryDataset._set_merge_by_sidc                 C   rq   )au  
        Set if Dataset need to merge sid. If not, one ins means one Pv.

        Args:
            merge_by_sid(bool): if merge sid or not

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset.set_merge_by_sid(True)

        N)r`   )r   r`   r   r   r   rl     rr   z InMemoryDataset.set_merge_by_sidc                 C   rq   )ab  
        Set if Dataset need to merge pv.

        Args:
            enable_pv_merge(bool): if enable_pv_merge or not

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset.set_enable_pv_merge(True)

        N)ra   )r   ra   r   r   r   rm   2  rs   z#InMemoryDataset.set_enable_pv_mergec                 C      | j   dS )a  
        Merge pv instance and convey it from input_channel to input_pv_channel. 
        It will be effective when enable_pv_merge_ is True.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
              dataset.preprocess_instance()

        N)r   preprocess_instancer   r   r   r   rv   C  s   z#InMemoryDataset.preprocess_instancec                 C   rG   )a  
        Set current phase in train. It is useful for untest.
        current_phase : 1 for join, 0 for update.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
              dataset.set_current_phase(1)

        N)r   set_current_phase)r   Zcurrent_phaser   r   r   rw   U  s   z!InMemoryDataset.set_current_phasec                 C   ru   )a  
        Divide pv instance and convey it to input_channel.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
              dataset.preprocess_instance()
              exe.train_from_dataset(dataset)
              dataset.postprocess_instance()

        N)r   postprocess_instancer   r   r   r   rx   g     z$InMemoryDataset.postprocess_instancez=paddle.distributed.InMemoryDataset._set_fleet_send_batch_size   c                 C   rq   )ar  
        Set fleet send batch size, default is 1024

        Args:
            fleet_send_batch_size(int): fleet send batch size

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset.set_fleet_send_batch_size(800)

        N)rZ   )r   rZ   r   r   r   set_fleet_send_batch_sizez     
z)InMemoryDataset.set_fleet_send_batch_sizez@paddle.distributed.InMemoryDataset._set_fleet_send_sleep_secondsr   c                 C   rq   )as  
        Set fleet send sleep time, default is 0

        Args:
            fleet_send_sleep_seconds(int): fleet send sleep time

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset.set_fleet_send_sleep_seconds(2)

        N)rc   )r   rc   r   r   r   set_fleet_send_sleep_seconds  r|   z,InMemoryDataset.set_fleet_send_sleep_secondsz7paddle.distributed.InMemoryDataset._set_merge_by_lineidr   c                 C   s   | j | d| _d| _dS )a  
        Set merge by line id, instances of same line id will be merged after
        shuffle, you should parse line id in data generator.

        Args:
            merge_size(int): ins size to merge. default is 2.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              dataset.set_merge_by_lineid()

        TN)r   set_merge_by_lineidrb   r]   )r   Z
merge_sizer   r   r   r~     s   
z#InMemoryDataset.set_merge_by_lineidz@paddle.distributed.InMemoryDataset._set_generate_unique_feasignsc                 C   s   | j | || _|| _d S r;   )r   set_generate_unique_feasignsZgen_uni_feasignsZlocal_shard_num)r   Zgenerate_uni_feasigns	shard_numr   r   r   r     s   
z,InMemoryDataset.set_generate_unique_feasignsz@paddle.distributed.InMemoryDataset._generate_local_tables_unlockc                 C   s   | j ||||| d S r;   )r   generate_local_tables_unlock)r   Ztable_idZfea_dimZread_thread_numZconsume_thread_numr   r   r   r   r     s   
z,InMemoryDataset.generate_local_tables_unlockc                 C   sZ   t |dd }t |dd }t |dd }| jr)t r+| j||| dS dS dS )a  
        :api_attr: Static Graph

        Set training date for pull sparse parameters, saving and loading model. Only used in psgpu

        Args:
            date(str): training date(format : YYMMDD). eg.20211111

        Examples:
            .. code-block:: python

                import paddle.fluid as fluid

                dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
                dataset.set_date("20211111")
        N      )intr    r   rO   r!   set_dater   dateyearmonthdayr   r   r   r     s   zInMemoryDataset.set_datez3paddle.distributed.InMemoryDataset.load_into_memoryFc                 C   sF   |    | js| j  dS t r!| j| j | j| dS dS )a  
        Load data into memory

         Args:
            is_shuffle(bool): whether to use local shuffle, default is False

        Examples:
            .. code-block:: python

              # required: skiptest
              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
        N)rN   r    r   load_into_memoryr   rO   r!   Zset_dataset)r   Z
is_shuffler   r   r   r     s   z InMemoryDataset.load_into_memoryz6paddle.distributed.InMemoryDataset.preload_into_memoryNc                 C   s:   |    |du r| j}| j| | j  | j  dS )a  
        Load data into memory in async mode

        Args:
            thread_num(int): preload thread num

        Examples:
            .. code-block:: python

              # required: skiptest
              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.preload_into_memory()
              dataset.wait_preload_done()
        N)rN   r   r   Zset_preload_thread_numZcreate_preload_readerspreload_into_memoryr7   r   r   r   r     s   
z#InMemoryDataset.preload_into_memoryz4paddle.distributed.InMemoryDataset.wait_preload_donec                 C   s   | j   | j   dS )a  
        Wait preload_into_memory done

        Examples:
            .. code-block:: python

              # required: skiptest
              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.preload_into_memory()
              dataset.wait_preload_done()
        N)r   wait_preload_doneZdestroy_preload_readersr   r   r   r   r     s   
z!InMemoryDataset.wait_preload_donez0paddle.distributed.InMemoryDataset.local_shufflec                 C   ru   )a  
        Local shuffle

        Examples:
            .. code-block:: python

              # required: skiptest
              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
              dataset.local_shuffle()
        N)r   local_shuffler   r   r   r   r   0  ry   zInMemoryDataset.local_shufflez1paddle.distributed.InMemoryDataset.global_shuffle   c                 C   s.  |dur!t |drtd |  n|j  | jdkr!| | _| jdu r)d| _| jdu r1d| _| j	  | j
| j | j| j | j| j |dur^t |drY|  n|j  | j| |durwt |drr|  n|j  | jr| j  |durt |dr|  dS |j  dS dS )ag  
        Global shuffle.
        Global shuffle can be used only in distributed mode. i.e. multiple
        processes on single machine or multiple machines training together.
        If you run in distributed mode, you should pass fleet instead of None.

        Examples:
            .. code-block:: python

              # required: skiptest
              import paddle.fluid as fluid
              from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
              dataset.global_shuffle(fleet)

        Args:
            fleet(Fleet): fleet singleton. Default None.
            thread_num(int): shuffle thread num. Default is 12.

        Nbarrier_workerzpscore fleetrX   rz   r   )hasattrprintr   _role_makerrd   Z
worker_numrZ   rc   r   Z"register_client2client_msg_handlerZset_trainer_numr{   r}   global_shufflerb   )r   fleetr   r   r   r   r   C  s>   















zInMemoryDataset.global_shufflez1paddle.distributed.InMemoryDataset.release_memoryc                 C   ru   )a8  
        :api_attr: Static Graph
        
        Release InMemoryDataset memory data, when data will not be used again.

        Examples:
            .. code-block:: python

              # required: skiptest
              import paddle.fluid as fluid
              from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
              dataset.global_shuffle(fleet)
              exe = fluid.Executor(fluid.CPUPlace())
              exe.run(fluid.default_startup_program())
              exe.train_from_dataset(fluid.default_main_program(), dataset)
              dataset.release_memory()

        N)r   release_memoryr   r   r   r   r     s   zInMemoryDataset.release_memoryc                 C   s
   | j  S )a  
        Get memory data size of Pv, user can call this function to know the pv num
        of ins in all workers after load into memory.

        Note:
            This function may cause bad performance, because it has barrier

        Returns:
            The size of memory pv data.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
              print dataset.get_pv_data_size()

        )r   get_pv_data_sizer   r   r   r   r     s   
z InMemoryDataset.get_pv_data_sizez7paddle.distributed.InMemoryDataset.get_memory_data_sizec                 C   sL   ddl }| j }||g}|dur"|d }|j|| |d S |d S )a6  
        Get memory data size, user can call this function to know the num
        of ins in all workers after load into memory.

        Note:
            This function may cause bad performance, because it has barrier

        Args:
            fleet(Fleet): Fleet Object.

        Returns:
            The size of memory data.

        Examples:
            .. code-block:: python

              # required: skiptest
              import paddle.fluid as fluid
              from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
              print dataset.get_memory_data_size(fleet)

        r   N)numpyr   get_memory_data_sizearrayr   all_reduce_workerr   r   npZlocal_data_sizeZglobal_data_sizer   r   r   r     s   
z$InMemoryDataset.get_memory_data_sizez8paddle.distributed.InMemoryDataset.get_shuffle_data_sizec                 C   st   ddl }| j }||g}td| |dur6|d }t|dr+|j|}|d S |j	|| |d S |d S )a  
        Get shuffle data size, user can call this function to know the num
        of ins in all workers after local/global shuffle.

        Note:
            This function may cause bad performance to local shuffle,
            because it has barrier. It does not affect global shuffle.

        Args:
            fleet(Fleet): Fleet Object.

        Returns:
            The size of shuffle data.

        Examples:
            .. code-block:: python

              # required: skiptest
              import paddle.fluid as fluid
              from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
              dataset.global_shuffle(fleet)
              print dataset.get_shuffle_data_size(fleet)

        r   Nz global shuffle local_data_size: util)
r   r   get_shuffle_data_sizer   r   r   r   Z
all_reducer   r   r   r   r   r   r     s    


z%InMemoryDataset.get_shuffle_data_sizec                 C   rG   )zO
        Set heter ps mode
        user no need to call this function.
        N)r   Zset_heter_ps)r   Zenable_heter_psr   r   r   _set_heter_ps	  s   zInMemoryDataset._set_heter_psc                 C   s   | dd| jj_| dd| jj_| dd| jj_| dd| jj_| d	d
| jj_| dd| jj_| dd| jj_	| dd| jj_
| dd| jj_| dd| jj_| jd dS )a  
        Set graph config, user can set graph config in gpu graph mode. 

        Args:
            config(dict): config dict.

        Returns:
            The size of shuffle data.

        Examples:
            .. code-block:: python

              # required: skiptest
              import paddle.fluid as fluid
              from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
              dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
              graph_config = {"walk_len": 24,
                    "walk_degree": 10,
                    "once_sample_startid_len": 80000,
                    "sample_times_one_chunk": 5,
                    "window": 3,
                    "debug_mode": 0,
                    "batch_size": 800,
                    "meta_path": "cuid2clk-clk2cuid;cuid2conv-conv2cuid;clk2cuid-cuid2clk;clk2cuid-cuid2conv",
                    "gpu_graph_training": 1}
              dataset.set_graph_config(graph_config)

        walk_degreer   walk_len   window   once_sample_startid_leni@  sample_times_one_chunk
   r0   
debug_moder   first_node_type 	meta_pathgpu_graph_trainingTN)getr   Zgraph_configr   r   r   r   r   r0   r   r   r   r   r   Zset_gpu_graph_mode)r   configr   r   r   set_graph_config  s&   



z InMemoryDataset.set_graph_config)rz   )r   )r   )Fr;   )Nr   )&r   r   r   r   r   r   rg   rN   rS   rU   rh   ri   rj   rk   rt   rl   rm   rv   rw   rx   r{   r}   r~   r   r   r   r   r   r   r   r   r   r   r   r   r   r   __classcell__r   r   re   r   r	   m  s    
	











;
%
+r	   c                       sF   e Zd ZdZ fddZeddddd Zd	d
 ZdddZ  Z	S )r
   z
    QueueDataset, it will process data streamly.

    Examples:
        .. code-block:: python

          import paddle.fluid as fluid
          dataset = fluid.DatasetFactory().create_dataset("QueueDataset")

    c                       t t|   d| j_dS )z`
        Initialize QueueDataset
        This class should be created by DatasetFactory
        ZMultiSlotDataFeedN)rY   r
   r   r   r@   r   re   r   r   r   J     zQueueDataset.__init__rV   z/paddle.distributed.QueueDataset._prepare_to_runrW   c                 C   sf   | j t| jkrt| j| _ | j dkrd| _ | j| j  | j| j | j|   | j  dS )zp
        Set data_feed_desc/thread num/filelist before run,
        user no need to call this function.
        r   r   N)	r   rJ   r   r   r6   r:   rK   rL   rM   r   r   r   r   rN   R  s   
zQueueDataset._prepare_to_runc                 C      t d)a  
        Local shuffle data.

        Local shuffle is not supported in QueueDataset
        NotImplementedError will be raised

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("QueueDataset")
              dataset.local_shuffle()

        Raises:
            NotImplementedError: QueueDataset does not support local shuffle

        zYQueueDataset does not support local shuffle, please use InMemoryDataset for local_shuffleNotImplementedErrorr   r   r   r   r   b  s   zQueueDataset.local_shuffleNc                 C   r   )aa  
        Global shuffle data.

        Global shuffle is not supported in QueueDataset
        NotImplementedError will be raised

        Args:
            fleet(Fleet): fleet singleton. Default None.

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
              dataset = fluid.DatasetFactory().create_dataset("QueueDataset")
              dataset.global_shuffle(fleet)

        Raises:
            NotImplementedError: QueueDataset does not support global shuffle

        z[QueueDataset does not support global shuffle, please use InMemoryDataset for global_shuffler   r   r   r   r   r   r   x  s   zQueueDataset.global_shuffler;   )
r   r   r   r   r   r   rN   r   r   r   r   r   re   r   r
   >  s    
r
   c                       s2   e Zd ZdZ fddZdd Zd	ddZ  ZS )
FileInstantDatasetz
    FileInstantDataset, it will process data streamly.

    Examples:
        .. code-block:: python

          import paddle.fluid as fluid
          dataset = fluid.DatasetFactory.create_dataset("FileInstantDataset")
    c                    r   )zf
        Initialize FileInstantDataset
        This class should be created by DatasetFactory
        ZMultiSlotFileInstantDataFeedN)rY   r   r   r   r@   r   re   r   r   r     r   zFileInstantDataset.__init__c                 C   r   )zY
        Local shuffle
        FileInstantDataset does not support local shuffle
        z_FileInstantDataset does not support local shuffle, please use InMemoryDataset for local_shuffler   r   r   r   r   r        z FileInstantDataset.local_shuffleNc                 C   r   )z[
        Global shuffle
        FileInstantDataset does not support global shuffle
        zaFileInstantDataset does not support global shuffle, please use InMemoryDataset for global_shuffler   r   r   r   r   r     r   z!FileInstantDataset.global_shuffler;   )r   r   r   r   r   r   r   r   r   r   re   r   r     s
    
	r   c                       sh   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Zdd Z  ZS )BoxPSDatasetz
    BoxPSDataset: derived from InMemoryDataset.

    Examples:
        .. code-block:: python

          import paddle.fluid as fluid
          dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset")
    c                    s(   t t|   t| j| _d| j_dS )z`
        Initialize BoxPSDataset
        This class should be created by DatasetFactory
        ZPaddleBoxDataFeedN)	rY   r   r   r   ZBoxPSr   boxpsr   r@   r   re   r   r   r     s   zBoxPSDataset.__init__c                 C   sD   t |dd }t |dd }t |dd }| j||| dS )z%
        Workaround for date
        Nr   r   )r   r   r   r   r   r   r   r     s   zBoxPSDataset.set_datec                 C   ru   )a:  
        Begin Pass
        Notify BoxPS to load sparse parameters of next pass to GPU Memory 

        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset")
              dataset.begin_pass()
        N)r   
begin_passr   r   r   r   r     s   zBoxPSDataset.begin_passc                 C   rG   )a  
        End Pass
        Notify BoxPS that current pass ended 
        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset")
              dataset.end_pass(True)
        N)r   end_pass)r   Zneed_save_deltar   r   r   r     s   zBoxPSDataset.end_passc                 C   ru   )a  
        Wait async preload done
        Wait Until Feed Pass Done
        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.preload_into_memory()
              dataset.wait_preload_done()
        N)r   Zwait_feed_pass_doner   r   r   r   r     s   zBoxPSDataset.wait_preload_donec                 C      |    | j  dS )a  
        Load next pass into memory and notify boxps to fetch its emb from SSD
        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.load_into_memory()
	    N)rN   r   r   r   r   r   r   r        zBoxPSDataset.load_into_memoryc                 C   r   )a  
        Begin async preload next pass while current pass may be training
        Examples:
            .. code-block:: python

              import paddle.fluid as fluid
              dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset")
              filelist = ["a.txt", "b.txt"]
              dataset.set_filelist(filelist)
              dataset.preload_into_memory()
        N)rN   r   r   r   r   r   r   r     r   z BoxPSDataset.preload_into_memoryc                 C   s$   | j s
| j|d | j| d S )NT)r[   r   ro   rp   r7   r   r   r   rS     s   z)BoxPSDataset._dynamic_adjust_before_trainc                 C   rR   r;   r   r   r   r   r   rU   $  rT   z(BoxPSDataset._dynamic_adjust_after_trainc                 C   s   t |}| j| dS r+   )r,   r   r-   r.   r   r   r   r-   '  s   zBoxPSDataset.slots_shuffle)r   r   r   r   r   r   r   r   r   r   r   rS   rU   r-   r   r   r   re   r   r     s    
		r   N)r   Zpaddle.fluid.protor   Zgoogle.protobufr   r   r   utilsr   __all__objectr   r   r	   r
   r   r   r   r   r   r   <module>   s&   *  -     VU&